Skip to content
Merged
5 changes: 5 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ type ConsumerOptions struct {
Schema Schema

// MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate)
// When the retry budget is exhausted, or when the broker reports a non-retriable error
// (e.g. AuthorizationError, TopicNotFound, TopicTerminated, IncompatibleSchema), the
// consumer is closed. Applications can observe the close — and recover the cause as the
// err argument — by registering a ConsumerInterceptor that also implements
Comment thread
nodece marked this conversation as resolved.
// ConsumerCloseInterceptor.
MaxReconnectToBroker *uint
Comment thread
nodece marked this conversation as resolved.

// BackOffPolicyFunc parameterize the following options in the reconnection logic to
Expand Down
9 changes: 9 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,14 @@ func (c *consumer) NackID(msgID MessageID) {
}

func (c *consumer) Close() {
c.closeWithCause(nil)
}

// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor
// with the supplied cause. The hook fires exactly once per consumer; the cause
// is captured by the goroutine that wins closeOnce so concurrent callers cannot
// race the value.
func (c *consumer) closeWithCause(err error) {
c.closeOnce.Do(func() {
c.stopDiscovery()

Expand All @@ -734,6 +742,7 @@ func (c *consumer) Close() {
c.rlq.close()
c.metrics.ConsumersClosed.Inc()
c.metrics.ConsumersPartitions.Sub(float64(len(c.consumers)))
c.options.Interceptors.OnConsumerClose(c, err)
})
}

Expand Down
17 changes: 17 additions & 0 deletions pulsar/consumer_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ type ConsumerInterceptor interface {
OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
}

// ConsumerCloseInterceptor is an optional interface that a ConsumerInterceptor
// may also implement to be notified once the consumer has been closed. The hook
// fires exactly once per consumer. When err is nil, the close was initiated by
// the user; a non-nil err carries the cause that triggered an internal close
// (for example, exhausting MaxReconnectToBroker or a non-retriable broker error).
Comment thread
nodece marked this conversation as resolved.
type ConsumerCloseInterceptor interface {
OnConsumerClose(consumer Consumer, err error)
}

type ConsumerInterceptors []ConsumerInterceptor

func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) {
Expand All @@ -48,4 +57,12 @@ func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []Mes
}
}

func (x ConsumerInterceptors) OnConsumerClose(consumer Consumer, err error) {
for i := range x {
if c, ok := x[i].(ConsumerCloseInterceptor); ok {
c.OnConsumerClose(consumer, err)
}
}
}

var defaultConsumerInterceptors = make(ConsumerInterceptors, 0)
74 changes: 69 additions & 5 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,56 @@ const (
noMessageEntry = -1
)

// Broker error markers. When the broker reports any of these in response
// to a Subscribe command, retrying will not recover — the consumer must give up and notify
// the application. errMsgTopicNotFound and errMsgTopicTerminated are reused from the producer
// path; the rest are consumer-specific.
const (
errMsgConsumerSubscriptionNotFound = "SubscriptionNotFound"
errMsgConsumerAuthorizationError = "AuthorizationError"
errMsgConsumerBusy = "ConsumerBusy"
errMsgConsumerInvalidTopicName = "InvalidTopicName"
errMsgConsumerIncompatibleSchema = "IncompatibleSchema"
errMsgConsumerAssignError = "ConsumerAssignError"
errMsgConsumerNotAllowedError = "NotAllowedError"
)

// nonRetriableSubscribeErrorMarkers is the full set of broker error names that should
// terminate the reconnect loop immediately. Detection is by substring match on the
// error message, since the broker error arrives wire-formatted as "<ServerError>: <msg>"
// (see grabConn -> BaseCommand_ERROR handling).
var nonRetriableSubscribeErrorMarkers = []string{
errMsgTopicNotFound,
errMsgTopicTerminated,
errMsgConsumerSubscriptionNotFound,
errMsgConsumerAuthorizationError,
errMsgConsumerBusy,
errMsgConsumerInvalidTopicName,
errMsgConsumerIncompatibleSchema,
errMsgConsumerAssignError,
errMsgConsumerNotAllowedError,
}

func isNonRetriableSubscribeError(err error) bool {
if err == nil {
return false
}
msg := err.Error()
for _, marker := range nonRetriableSubscribeErrorMarkers {
if strings.Contains(msg, marker) {
return true
}
}
return false
}
Comment thread
nodece marked this conversation as resolved.

// causalCloser is implemented by Consumer wrappers that can record the reason
// they were closed and forward it to a ConsumerCloseInterceptor. The reconnect
// loop uses this to surface the underlying error when it gives up.
type causalCloser interface {
closeWithCause(err error)
}

type partitionConsumerOpts struct {
topic string
consumerName string
Expand Down Expand Up @@ -2062,6 +2112,19 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
assignedBrokerURL = connectionClosed.assignedBrokerURL
}

var giveUpNotified bool
notifyReconnectGiveUp := func(cause error) {
if giveUpNotified {
return
}
giveUpNotified = true
if cc, ok := pc.parentConsumer.(causalCloser); ok {
go cc.closeWithCause(cause)
} else {
go pc.parentConsumer.Close()
}
}
Comment thread
PavelZeger marked this conversation as resolved.

opFn := func() (struct{}, error) {
if maxRetry == 0 {
return struct{}{}, nil
Expand All @@ -2088,19 +2151,20 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
return struct{}{}, nil
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
pc.log.Warn("Topic Not Found.")
if isNonRetriableSubscribeError(err) {
pc.log.WithError(err).Warn("Non-retriable error during reconnect, giving up")
notifyReconnectGiveUp(err)
return struct{}{}, nil
}

if maxRetry > 0 {
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || bo.IsMaxBackoffReached() {
if maxRetry == 0 {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
Comment thread
nodece marked this conversation as resolved.
notifyReconnectGiveUp(errors.New("max retry attempts reached for reconnecting to broker"))
return struct{}{}, nil
}

return struct{}{}, err
Expand Down
136 changes: 136 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5927,3 +5927,139 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when reconnecting")
}
}

// closeInterceptor captures the (consumer, err) pair delivered to
// ConsumerCloseInterceptor.OnConsumerClose and signals via fired.
type closeInterceptor struct {
fired chan struct{}
consumer Consumer
err error
once sync.Once
}

func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage) {}
func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {}
func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) {
c.once.Do(func() {
c.consumer = consumer
c.err = err
close(c.fired)
})
}

func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) {
req := testcontainers.ContainerRequest{
Image: getPulsarTestImage(),
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
WaitingFor: wait.ForExposedPort(),
Cmd: []string{"bin/pulsar", "standalone", "-nfw", "--advertised-address", "localhost"},
}
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err)
t.Cleanup(func() {
if err := c.Terminate(context.Background()); err != nil {
t.Logf("container terminate (cleanup) returned: %v", err)
}
})
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
require.NoError(t, err)
Comment thread
PavelZeger marked this conversation as resolved.

pulsarClient, err := NewClient(ClientOptions{
URL: endpoint,
ConnectionTimeout: 3 * time.Second,
OperationTimeout: 5 * time.Second,
})
require.NoError(t, err)
defer pulsarClient.Close()

maxRetry := uint(1)
interceptor := &closeInterceptor{fired: make(chan struct{})}

topic := newTopicName()
var testConsumer Consumer
require.Eventually(t, func() bool {
testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "test-on-close-interceptor",
MaxReconnectToBroker: &maxRetry,
BackOffPolicyFunc: func() backoff.Policy {
return newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
},
Comment thread
PavelZeger marked this conversation as resolved.
Interceptors: ConsumerInterceptors{interceptor},
})
return err == nil
}, 30*time.Second, 1*time.Second)
defer testConsumer.Close()

require.NoError(t, c.Terminate(context.Background()))

select {
case <-interceptor.fired:
case <-time.After(30 * time.Second):
t.Fatal("OnConsumerClose was not called within timeout")
}

assert.NotNil(t, interceptor.err, "interceptor should receive the cause of the close")
assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should receive the parent consumer")

pc := testConsumer.(*consumer).consumers[0]
require.Eventually(t, func() bool {
return pc.getConsumerState() == consumerClosed
}, 30*time.Second, 100*time.Millisecond, "consumer should be closed after exhausting max reconnect retries")
}
Comment thread
nodece marked this conversation as resolved.

func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) {
client, err := NewClient(ClientOptions{URL: serviceURL})
require.NoError(t, err)
defer client.Close()

interceptor := &closeInterceptor{fired: make(chan struct{})}
consumer, err := client.Subscribe(ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: "test-on-close-user",
Interceptors: ConsumerInterceptors{interceptor},
})
require.NoError(t, err)

consumer.Close()

select {
case <-interceptor.fired:
case <-time.After(5 * time.Second):
t.Fatal("OnConsumerClose was not called within timeout")
}

assert.Nil(t, interceptor.err, "user-initiated close should report nil cause")
assert.Equal(t, consumer, interceptor.consumer)
}

func TestIsNonRetriableSubscribeError(t *testing.T) {
cases := []struct {
name string
err error
want bool
}{
{"nil", nil, false},
{"topic not found", errors.New("TopicNotFound: topic does not exist"), true},
{"topic terminated", errors.New("TopicTerminatedError: topic was terminated"), true},
{"subscription not found", errors.New("SubscriptionNotFound: sub does not exist"), true},
{"authorization", errors.New("AuthorizationError: not authorized"), true},
{"consumer busy", errors.New("ConsumerBusy: another consumer attached"), true},
{"invalid topic name", errors.New("InvalidTopicName: bad name"), true},
{"incompatible schema", errors.New("IncompatibleSchema: schema mismatch"), true},
{"consumer assign error", errors.New("ConsumerAssignError: dispatcher assign failed"), true},
{"not allowed", errors.New("NotAllowedError: action not permitted"), true},
{"service not ready (retriable)", errors.New("ServiceNotReady: please retry"), false},
{"metadata error (retriable)", errors.New("MetadataError: zk timeout"), false},
{"plain network error (retriable)", errors.New("dial tcp: i/o timeout"), false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.want, isNonRetriableSubscribeError(tc.err))
})
}
}
Comment thread
nodece marked this conversation as resolved.
7 changes: 7 additions & 0 deletions pulsar/consumer_zero_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ func (z *zeroQueueConsumer) NackID(msgID MessageID) {
}

func (z *zeroQueueConsumer) Close() {
z.closeWithCause(nil)
}

// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor
// with the supplied cause. The hook fires exactly once per consumer.
func (z *zeroQueueConsumer) closeWithCause(err error) {
z.closeOnce.Do(func() {
z.Lock()
defer z.Unlock()
Expand All @@ -272,6 +278,7 @@ func (z *zeroQueueConsumer) Close() {
z.rlq.close()
z.metrics.ConsumersClosed.Inc()
z.metrics.ConsumersPartitions.Sub(float64(1))
z.options.Interceptors.OnConsumerClose(z, err)
})
}

Expand Down
Loading