diff --git a/pulsar/consumer.go b/pulsar/consumer.go index f70055433..9aeabb03c 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -240,6 +240,11 @@ type ConsumerOptions struct { Schema Schema // MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate) + // When the retry budget is exhausted, or when the broker reports a non-retriable error + // (e.g. AuthorizationError, TopicNotFound, TopicTerminated, IncompatibleSchema), the + // consumer is closed. Applications can observe the close — and recover the cause as the + // err argument — by registering a ConsumerInterceptor that also implements + // ConsumerCloseInterceptor. MaxReconnectToBroker *uint // BackOffPolicyFunc parameterize the following options in the reconnection logic to diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 5579d3741..d76108a2e 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -713,6 +713,14 @@ func (c *consumer) NackID(msgID MessageID) { } func (c *consumer) Close() { + c.closeWithCause(nil) +} + +// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor +// with the supplied cause. The hook fires exactly once per consumer; the cause +// is captured by the goroutine that wins closeOnce so concurrent callers cannot +// race the value. +func (c *consumer) closeWithCause(err error) { c.closeOnce.Do(func() { c.stopDiscovery() @@ -734,6 +742,7 @@ func (c *consumer) Close() { c.rlq.close() c.metrics.ConsumersClosed.Inc() c.metrics.ConsumersPartitions.Sub(float64(len(c.consumers))) + c.options.Interceptors.OnConsumerClose(c, err) }) } diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go index db46b7842..4ad6546c5 100644 --- a/pulsar/consumer_interceptor.go +++ b/pulsar/consumer_interceptor.go @@ -28,6 +28,15 @@ type ConsumerInterceptor interface { OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) } +// ConsumerCloseInterceptor is an optional interface that a ConsumerInterceptor +// may also implement to be notified once the consumer has been closed. The hook +// fires exactly once per consumer. When err is nil, the close was initiated by +// the user; a non-nil err carries the cause that triggered an internal close +// (for example, exhausting MaxReconnectToBroker or a non-retriable broker error). +type ConsumerCloseInterceptor interface { + OnConsumerClose(consumer Consumer, err error) +} + type ConsumerInterceptors []ConsumerInterceptor func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) { @@ -48,4 +57,12 @@ func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []Mes } } +func (x ConsumerInterceptors) OnConsumerClose(consumer Consumer, err error) { + for i := range x { + if c, ok := x[i].(ConsumerCloseInterceptor); ok { + c.OnConsumerClose(consumer, err) + } + } +} + var defaultConsumerInterceptors = make(ConsumerInterceptors, 0) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 8c6d89168..d7aba5244 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -93,6 +93,56 @@ const ( noMessageEntry = -1 ) +// Broker error markers. When the broker reports any of these in response +// to a Subscribe command, retrying will not recover — the consumer must give up and notify +// the application. errMsgTopicNotFound and errMsgTopicTerminated are reused from the producer +// path; the rest are consumer-specific. +const ( + errMsgConsumerSubscriptionNotFound = "SubscriptionNotFound" + errMsgConsumerAuthorizationError = "AuthorizationError" + errMsgConsumerBusy = "ConsumerBusy" + errMsgConsumerInvalidTopicName = "InvalidTopicName" + errMsgConsumerIncompatibleSchema = "IncompatibleSchema" + errMsgConsumerAssignError = "ConsumerAssignError" + errMsgConsumerNotAllowedError = "NotAllowedError" +) + +// nonRetriableSubscribeErrorMarkers is the full set of broker error names that should +// terminate the reconnect loop immediately. Detection is by substring match on the +// error message, since the broker error arrives wire-formatted as ": " +// (see grabConn -> BaseCommand_ERROR handling). +var nonRetriableSubscribeErrorMarkers = []string{ + errMsgTopicNotFound, + errMsgTopicTerminated, + errMsgConsumerSubscriptionNotFound, + errMsgConsumerAuthorizationError, + errMsgConsumerBusy, + errMsgConsumerInvalidTopicName, + errMsgConsumerIncompatibleSchema, + errMsgConsumerAssignError, + errMsgConsumerNotAllowedError, +} + +func isNonRetriableSubscribeError(err error) bool { + if err == nil { + return false + } + msg := err.Error() + for _, marker := range nonRetriableSubscribeErrorMarkers { + if strings.Contains(msg, marker) { + return true + } + } + return false +} + +// causalCloser is implemented by Consumer wrappers that can record the reason +// they were closed and forward it to a ConsumerCloseInterceptor. The reconnect +// loop uses this to surface the underlying error when it gives up. +type causalCloser interface { + closeWithCause(err error) +} + type partitionConsumerOpts struct { topic string consumerName string @@ -2062,6 +2112,19 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose assignedBrokerURL = connectionClosed.assignedBrokerURL } + var giveUpNotified bool + notifyReconnectGiveUp := func(cause error) { + if giveUpNotified { + return + } + giveUpNotified = true + if cc, ok := pc.parentConsumer.(causalCloser); ok { + go cc.closeWithCause(cause) + } else { + go pc.parentConsumer.Close() + } + } + opFn := func() (struct{}, error) { if maxRetry == 0 { return struct{}{}, nil @@ -2088,10 +2151,9 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose return struct{}{}, nil } pc.log.WithError(err).Error("Failed to create consumer at reconnect") - errMsg := err.Error() - if strings.Contains(errMsg, errMsgTopicNotFound) { - // when topic is deleted, we should give up reconnection. - pc.log.Warn("Topic Not Found.") + if isNonRetriableSubscribeError(err) { + pc.log.WithError(err).Warn("Non-retriable error during reconnect, giving up") + notifyReconnectGiveUp(err) return struct{}{}, nil } @@ -2099,8 +2161,10 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose maxRetry-- } pc.metrics.ConsumersReconnectFailure.Inc() - if maxRetry == 0 || bo.IsMaxBackoffReached() { + if maxRetry == 0 { pc.metrics.ConsumersReconnectMaxRetry.Inc() + notifyReconnectGiveUp(errors.New("max retry attempts reached for reconnecting to broker")) + return struct{}{}, nil } return struct{}{}, err diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 4e2cbc4a9..e19642871 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5927,3 +5927,139 @@ func TestSelectConnectionForSameConsumer(t *testing.T) { "The consumer uses a different connection when reconnecting") } } + +// closeInterceptor captures the (consumer, err) pair delivered to +// ConsumerCloseInterceptor.OnConsumerClose and signals via fired. +type closeInterceptor struct { + fired chan struct{} + consumer Consumer + err error + once sync.Once +} + +func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage) {} +func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {} +func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {} +func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) { + c.once.Do(func() { + c.consumer = consumer + c.err = err + close(c.fired) + }) +} + +func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) { + req := testcontainers.ContainerRequest{ + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + Cmd: []string{"bin/pulsar", "standalone", "-nfw", "--advertised-address", "localhost"}, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err) + t.Cleanup(func() { + if err := c.Terminate(context.Background()); err != nil { + t.Logf("container terminate (cleanup) returned: %v", err) + } + }) + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err) + + pulsarClient, err := NewClient(ClientOptions{ + URL: endpoint, + ConnectionTimeout: 3 * time.Second, + OperationTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer pulsarClient.Close() + + maxRetry := uint(1) + interceptor := &closeInterceptor{fired: make(chan struct{})} + + topic := newTopicName() + var testConsumer Consumer + require.Eventually(t, func() bool { + testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "test-on-close-interceptor", + MaxReconnectToBroker: &maxRetry, + BackOffPolicyFunc: func() backoff.Policy { + return newTestBackoffPolicy(100*time.Millisecond, 1*time.Second) + }, + Interceptors: ConsumerInterceptors{interceptor}, + }) + return err == nil + }, 30*time.Second, 1*time.Second) + defer testConsumer.Close() + + require.NoError(t, c.Terminate(context.Background())) + + select { + case <-interceptor.fired: + case <-time.After(30 * time.Second): + t.Fatal("OnConsumerClose was not called within timeout") + } + + assert.NotNil(t, interceptor.err, "interceptor should receive the cause of the close") + assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should receive the parent consumer") + + pc := testConsumer.(*consumer).consumers[0] + require.Eventually(t, func() bool { + return pc.getConsumerState() == consumerClosed + }, 30*time.Second, 100*time.Millisecond, "consumer should be closed after exhausting max reconnect retries") +} + +func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) { + client, err := NewClient(ClientOptions{URL: serviceURL}) + require.NoError(t, err) + defer client.Close() + + interceptor := &closeInterceptor{fired: make(chan struct{})} + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: newTopicName(), + SubscriptionName: "test-on-close-user", + Interceptors: ConsumerInterceptors{interceptor}, + }) + require.NoError(t, err) + + consumer.Close() + + select { + case <-interceptor.fired: + case <-time.After(5 * time.Second): + t.Fatal("OnConsumerClose was not called within timeout") + } + + assert.Nil(t, interceptor.err, "user-initiated close should report nil cause") + assert.Equal(t, consumer, interceptor.consumer) +} + +func TestIsNonRetriableSubscribeError(t *testing.T) { + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"topic not found", errors.New("TopicNotFound: topic does not exist"), true}, + {"topic terminated", errors.New("TopicTerminatedError: topic was terminated"), true}, + {"subscription not found", errors.New("SubscriptionNotFound: sub does not exist"), true}, + {"authorization", errors.New("AuthorizationError: not authorized"), true}, + {"consumer busy", errors.New("ConsumerBusy: another consumer attached"), true}, + {"invalid topic name", errors.New("InvalidTopicName: bad name"), true}, + {"incompatible schema", errors.New("IncompatibleSchema: schema mismatch"), true}, + {"consumer assign error", errors.New("ConsumerAssignError: dispatcher assign failed"), true}, + {"not allowed", errors.New("NotAllowedError: action not permitted"), true}, + {"service not ready (retriable)", errors.New("ServiceNotReady: please retry"), false}, + {"metadata error (retriable)", errors.New("MetadataError: zk timeout"), false}, + {"plain network error (retriable)", errors.New("dial tcp: i/o timeout"), false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, isNonRetriableSubscribeError(tc.err)) + }) + } +} diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go index 97016c381..4978fae25 100644 --- a/pulsar/consumer_zero_queue.go +++ b/pulsar/consumer_zero_queue.go @@ -261,6 +261,12 @@ func (z *zeroQueueConsumer) NackID(msgID MessageID) { } func (z *zeroQueueConsumer) Close() { + z.closeWithCause(nil) +} + +// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor +// with the supplied cause. The hook fires exactly once per consumer. +func (z *zeroQueueConsumer) closeWithCause(err error) { z.closeOnce.Do(func() { z.Lock() defer z.Unlock() @@ -272,6 +278,7 @@ func (z *zeroQueueConsumer) Close() { z.rlq.close() z.metrics.ConsumersClosed.Inc() z.metrics.ConsumersPartitions.Sub(float64(1)) + z.options.Interceptors.OnConsumerClose(z, err) }) }