diff --git a/pulsar/consumer.go b/pulsar/consumer.go index bf2eafbf71..52e57ecc41 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -294,6 +294,9 @@ type Consumer interface { // The list of MessageID instances of all the topics that the consumer subscribed GetLastMessageIDs() ([]TopicMessageID, error) + // Closed returns a channel indicating that consumer is closed + Closed() <-chan struct{} + // Receive a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 679054a044..2a3ba4d4f5 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -65,6 +65,7 @@ type consumer struct { closeOnce sync.Once closeCh chan struct{} errorCh chan error + close func() // close will be assigned only after full initialization cycle will be ready stopDiscovery func() log log.Logger @@ -288,6 +289,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, duration = defaultAutoDiscoveryDuration } consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration) + consumer.close = consumer.closeInternal consumer.metrics.ConsumersOpened.Inc() return consumer, nil @@ -496,6 +498,10 @@ func (c *consumer) unsubscribe(force bool) error { return nil } +func (c *consumer) Closed() <-chan struct{} { + return c.closeCh +} + func (c *consumer) GetLastMessageIDs() ([]TopicMessageID, error) { ids := make([]TopicMessageID, 0) for _, pc := range c.consumers { @@ -674,6 +680,12 @@ func (c *consumer) NackID(msgID MessageID) { } func (c *consumer) Close() { + if c.close != nil { + c.close() + } +} + +func (c *consumer) closeInternal() { c.closeOnce.Do(func() { c.stopDiscovery() diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 020e38a065..56df826684 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -97,6 +97,10 @@ func (c *multiTopicConsumer) Unsubscribe() error { return errs } +func (c *multiTopicConsumer) Closed() <-chan struct{} { + return c.closeCh +} + func (c *multiTopicConsumer) UnsubscribeForce() error { var errs error for t, consumer := range c.consumers { diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index d8001dc122..3b03c9c853 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1595,7 +1595,9 @@ func (pc *partitionConsumer) runEventsLoop() { return case connectionClosed := <-pc.connectClosedCh: pc.log.Debug("runEventsLoop will reconnect") - pc.reconnectToBroker(connectionClosed) + if !pc.reconnectToBroker(connectionClosed) { + pc.parentConsumer.Close() + } } } }() @@ -1679,7 +1681,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { close(pc.closeCh) } -func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) { +func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) bool { var maxRetry int if pc.options.maxReconnectToBroker == nil { @@ -1697,7 +1699,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose if pc.getConsumerState() != consumerReady { // Consumer is already closing pc.log.Info("consumer state not ready, exit reconnect") - return + return false } var assignedBrokerURL string @@ -1722,14 +1724,14 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose if pc.getConsumerState() != consumerReady { // Consumer is already closing pc.log.Info("consumer state not ready, exit reconnect") - return + return false } err := pc.grabConn(assignedBrokerURL) if err == nil { // Successfully reconnected pc.log.Info("Reconnected consumer to broker") - return + return true } pc.log.WithError(err).Error("Failed to create consumer at reconnect") errMsg := err.Error() @@ -1747,6 +1749,9 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose pc.metrics.ConsumersReconnectMaxRetry.Inc() } } + + pc.log.Warn("Reached maximum number of reconnection attempts") + return false } func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) { diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 58cfa80fa3..8582cbe055 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -133,6 +133,10 @@ func (c *regexConsumer) Unsubscribe() error { return errs } +func (c *regexConsumer) Closed() <-chan struct{} { + return c.closeCh +} + func (c *regexConsumer) UnsubscribeForce() error { var errs error c.consumersLock.Lock() diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 04439cbcfe..d467bd81f7 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -41,6 +41,7 @@ import ( "github.com/google/uuid" "github.com/pierrec/lz4" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" ) @@ -54,7 +55,7 @@ func TestProducerConsumer(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "my-topic" @@ -66,7 +67,7 @@ func TestProducerConsumer(t *testing.T) { SubscriptionName: "my-sub", Type: Exclusive, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create producer @@ -74,28 +75,25 @@ func TestProducerConsumer(t *testing.T) { Topic: topic, DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages for i := 0; i < 10; i++ { - if _, err := producer.Send(ctx, &ProducerMessage{ + _, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), Key: "pulsar", Properties: map[string]string{ "key-1": "pulsar-1", }, - }); err != nil { - log.Fatal(err) - } + }) + require.NoError(t, err) } // receive 10 messages for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) - if err != nil { - log.Fatal(err) - } + require.NoError(t, err) expectMsg := fmt.Sprintf("hello-%d", i) expectProperties := map[string]string{ @@ -114,7 +112,7 @@ func TestConsumerConnectError(t *testing.T) { URL: "pulsar://invalid-hostname:6650", }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() @@ -135,7 +133,7 @@ func TestBatchMessageReceive(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := "persistent://public/default/receive-batch" @@ -152,7 +150,7 @@ func TestBatchMessageReceive(t *testing.T) { BatchingMaxMessages: uint(batchSize), DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, topicName, producer.Topic()) defer producer.Close() @@ -160,7 +158,7 @@ func TestBatchMessageReceive(t *testing.T) { Topic: topicName, SubscriptionName: subName, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() count := 0 @@ -170,12 +168,12 @@ func TestBatchMessageReceive(t *testing.T) { Payload: []byte(messageContent), } _, err := producer.Send(ctx, msg) - assert.Nil(t, err) + require.NoError(t, err) } for i := 0; i < numOfMessages; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) consumer.Ack(msg) count++ } @@ -201,7 +199,7 @@ func TestConsumerWithInvalidConf(t *testing.T) { // Expect error in creating consumer assert.Nil(t, consumer) - assert.NotNil(t, err) + require.NoError(t, err) fmt.Println(err.Error()) assert.Equal(t, err.(*Error).Result(), SubscriptionNotFound) @@ -212,7 +210,7 @@ func TestConsumerWithInvalidConf(t *testing.T) { // Expect error in creating consumer assert.Nil(t, consumer) - assert.NotNil(t, err) + require.NoError(t, err) assert.Equal(t, err.(*Error).Result(), TopicNotFound) } @@ -222,7 +220,7 @@ func TestConsumerSubscriptionEarliestPosition(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix()) @@ -232,7 +230,7 @@ func TestConsumerSubscriptionEarliestPosition(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send message @@ -240,12 +238,12 @@ func TestConsumerSubscriptionEarliestPosition(t *testing.T) { _, err = producer.Send(ctx, &ProducerMessage{ Payload: []byte("msg-1-content-1"), }) - assert.Nil(t, err) + require.NoError(t, err) _, err = producer.Send(ctx, &ProducerMessage{ Payload: []byte("msg-1-content-2"), }) - assert.Nil(t, err) + require.NoError(t, err) // create consumer consumer, err := client.Subscribe(ConsumerOptions{ @@ -253,11 +251,11 @@ func TestConsumerSubscriptionEarliestPosition(t *testing.T) { SubscriptionName: subName, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, "msg-1-content-1", string(msg.Payload())) } @@ -266,7 +264,7 @@ func TestConsumerKeyShared(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/test-topic-6" @@ -276,7 +274,7 @@ func TestConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() consumer2, err := client.Subscribe(ConsumerOptions{ @@ -284,7 +282,7 @@ func TestConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer2.Close() // create producer @@ -292,7 +290,7 @@ func TestConsumerKeyShared(t *testing.T) { Topic: topic, DisableBatching: true, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() ctx := context.Background() @@ -301,7 +299,7 @@ func TestConsumerKeyShared(t *testing.T) { Key: fmt.Sprintf("key-shared-%d", i%3), Payload: []byte(fmt.Sprintf("value-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } receivedConsumer1 := 0 @@ -335,7 +333,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/testGetPartitions5" @@ -545,11 +543,11 @@ func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) { Keys: []string{"client-rsa.pem"}, }, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() topics, err := client.TopicPartitions(topic) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, topic+"-partition-0", topics[0]) assert.Equal(t, topic+"-partition-1", topics[1]) assert.Equal(t, topic+"-partition-2", topics[2]) @@ -565,7 +563,7 @@ func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) { ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, }, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() ctx := context.Background() @@ -573,14 +571,14 @@ func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) { _, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } msgs := make([]string, 0) for i := 0; i < 10; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) msgs = append(msgs, string(msg.Payload())) t.Logf("Received message msgId: %#v -- content: '%s'\n", @@ -596,7 +594,7 @@ func TestConsumerReceiveTimeout(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "test-topic-with-no-messages" @@ -609,7 +607,7 @@ func TestConsumerReceiveTimeout(t *testing.T) { SubscriptionName: "my-sub1", Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() msg, err := consumer.Receive(ctx) @@ -617,12 +615,75 @@ func TestConsumerReceiveTimeout(t *testing.T) { assert.NotNil(t, err) } +func TestConsumerClosed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + require.NoError(t, err) + defer client.Close() + + topic := "test-topic-for-consumer-closed" + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-consumer-closed", + Type: Shared, + }) + require.NoError(t, err) + defer consumer.Close() + + select { + case <-consumer.Closed(): + + default: + } + + consumer.Close() + + select { + case <-consumer.Closed(): + default: + require.Fail(t, "consumer was not expected to be opened") + } +} + +func TestConsumerClosingOnReconnectionLimit(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + require.NoError(t, err) + defer client.Close() + + topic := "test-topic-reconnection-testing" + maxReconnectToBroker := uint(0) + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-consumer-reconnections", + Type: Shared, + MaxReconnectToBroker: &maxReconnectToBroker, + }) + require.NoError(t, err) + defer consumer.Close() + + testURL := adminURL + "/" + "admin/v2/persistent/public/default/test-topic-reconnection-testing/unload" + makeHTTPCall(t, http.MethodPut, testURL, "64") + + select { + case <-consumer.Closed(): + case <-time.After(5 * time.Second): + require.Fail(t, "consumer must be closed") + } +} + func TestConsumerShared(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/testMultiPartitionConsumerShared" @@ -636,7 +697,7 @@ func TestConsumerShared(t *testing.T) { SubscriptionName: sub, Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() consumer2, err := client.Subscribe(ConsumerOptions{ @@ -644,7 +705,7 @@ func TestConsumerShared(t *testing.T) { SubscriptionName: sub, Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer2.Close() // create producer @@ -652,7 +713,7 @@ func TestConsumerShared(t *testing.T) { Topic: topic, DisableBatching: true, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages with unique payloads @@ -698,7 +759,7 @@ func TestConsumerEventTime(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := "test-event-time" @@ -707,14 +768,14 @@ func TestConsumerEventTime(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() et := timeFromUnixTimestampMillis(uint64(5)) @@ -722,10 +783,10 @@ func TestConsumerEventTime(t *testing.T) { Payload: []byte("test"), EventTime: et, }) - assert.Nil(t, err) + require.NoError(t, err) msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, et, msg.EventTime()) assert.Equal(t, "test", string(msg.Payload())) } @@ -770,7 +831,7 @@ func TestConsumerFlow(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := "test-received-since-flow" @@ -779,7 +840,7 @@ func TestConsumerFlow(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ @@ -787,7 +848,7 @@ func TestConsumerFlow(t *testing.T) { SubscriptionName: "sub-1", ReceiverQueueSize: 4, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() for msgNum := 0; msgNum < 100; msgNum++ { @@ -800,7 +861,7 @@ func TestConsumerFlow(t *testing.T) { for msgNum := 0; msgNum < 100; msgNum++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", msgNum), string(msg.Payload())) } } @@ -810,7 +871,7 @@ func TestConsumerAck(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -819,7 +880,7 @@ func TestConsumerAck(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ @@ -827,7 +888,7 @@ func TestConsumerAck(t *testing.T) { SubscriptionName: "sub-1", Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) const N = 100 @@ -841,7 +902,7 @@ func TestConsumerAck(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) if i < N/2 { @@ -858,13 +919,13 @@ func TestConsumerAck(t *testing.T) { SubscriptionName: "sub-1", Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // We should only receive the 2nd half of messages for i := N / 2; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) consumer.Ack(msg) @@ -1065,7 +1126,7 @@ func TestConsumerNack(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -1074,7 +1135,7 @@ func TestConsumerNack(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ @@ -1083,7 +1144,7 @@ func TestConsumerNack(t *testing.T) { Type: Shared, NackRedeliveryDelay: 1 * time.Second, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() const N = 100 @@ -1098,7 +1159,7 @@ func TestConsumerNack(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) if i%2 == 0 { @@ -1115,7 +1176,7 @@ func TestConsumerNack(t *testing.T) { // We should only receive the odd messages for i := 1; i < N; i += 2 { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) consumer.Ack(msg) @@ -1127,7 +1188,7 @@ func TestConsumerCompression(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -1138,7 +1199,7 @@ func TestConsumerCompression(t *testing.T) { Topic: topicName, CompressionType: LZ4, }) - assert.Nil(t, err) + require.NoError(t, err) defer p1.Close() // disable batching @@ -1154,7 +1215,7 @@ func TestConsumerCompression(t *testing.T) { Topic: topicName, SubscriptionName: "sub-1", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() const N = 100 @@ -1177,7 +1238,7 @@ func TestConsumerCompression(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-enabled", i), string(msg.Payload())) consumer.Ack(msg) } @@ -1195,7 +1256,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -1206,14 +1267,14 @@ func TestConsumerCompressionWithBatches(t *testing.T) { CompressionType: ZLib, BatchingMaxPublishDelay: 1 * time.Minute, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() const N = 100 @@ -1228,7 +1289,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) consumer.Ack(msg) } @@ -1238,7 +1299,7 @@ func TestConsumerSeek(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -1248,14 +1309,14 @@ func TestConsumerSeek(t *testing.T) { Topic: topicName, DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10 @@ -1265,7 +1326,7 @@ func TestConsumerSeek(t *testing.T) { id, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) if i == N-50 { seekID = id @@ -1275,16 +1336,16 @@ func TestConsumerSeek(t *testing.T) { // Don't consume all messages so some stay in queues for i := 0; i < N-20; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) consumer.Ack(msg) } err = consumer.Seek(seekID) - assert.Nil(t, err) + require.NoError(t, err) msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload())) } @@ -1292,7 +1353,7 @@ func TestConsumerSeekByTime(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -1302,44 +1363,44 @@ func TestConsumerSeekByTime(t *testing.T) { Topic: topicName, DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "my-sub", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10 const N = 1100 resetTimeStr := "100s" retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr) - assert.Nil(t, err) + require.NoError(t, err) for i := 0; i < N; i++ { _, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } // Don't consume all messages so some stay in queues for i := 0; i < N-20; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) consumer.Ack(msg) } currentTimestamp := time.Now() err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond)) - assert.Nil(t, err) + require.NoError(t, err) for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) consumer.Ack(msg) } @@ -1432,7 +1493,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() dlqTopic := newTopicName() @@ -1441,7 +1502,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { Topic: dlqTopic, SubscriptionName: "dlq", }) - assert.Nil(t, err) + require.NoError(t, err) defer dlqConsumer.Close() topic := newTopicName() @@ -1465,14 +1526,14 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { DLQ: &dlqPolicy, Name: consumerName, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages @@ -1630,7 +1691,7 @@ func TestDLQMultiTopics(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() dlqTopic := newTopicName() @@ -1639,7 +1700,7 @@ func TestDLQMultiTopics(t *testing.T) { Topic: dlqTopic, SubscriptionName: "dlq", }) - assert.Nil(t, err) + require.NoError(t, err) defer dlqConsumer.Close() topicPrefix := newTopicName() @@ -1665,7 +1726,7 @@ func TestDLQMultiTopics(t *testing.T) { }, }, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create one producer for each topic @@ -1674,7 +1735,7 @@ func TestDLQMultiTopics(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, }) - assert.Nil(t, err) + require.NoError(t, err) producers[i] = producer } @@ -1742,17 +1803,17 @@ func TestRLQ(t *testing.T) { ctx := context.Background() client, err := NewClient(ClientOptions{URL: lookupURL}) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() // 1. Pre-produce N messages producer, err := client.CreateProducer(ProducerOptions{Topic: topic}) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() for i := 0; i < N; i++ { _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))}) - assert.Nil(t, err) + require.NoError(t, err) } // 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times @@ -1767,13 +1828,13 @@ func TestRLQ(t *testing.T) { RetryEnable: true, NackRedeliveryDelay: 1 * time.Second, }) - assert.Nil(t, err) + require.NoError(t, err) defer rlqConsumer.Close() rlqReceived := 0 for rlqReceived < N*(maxRedeliveries+1) { msg, err := rlqConsumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) rlqConsumer.ReconsumeLater(msg, 1*time.Second) rlqReceived++ } @@ -1906,13 +1967,13 @@ func TestRLQWithCustomProperties(t *testing.T) { SubscriptionName: subName, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer dlqConsumer.Close() dlqReceived := 0 for dlqReceived < N { msg, err := dlqConsumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) dlqConsumer.Ack(msg) dlqReceived++ } @@ -1932,7 +1993,7 @@ func TestRLQWithCustomProperties(t *testing.T) { Type: Shared, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer checkConsumer.Close() checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -2026,7 +2087,7 @@ func TestRLQMultiTopics(t *testing.T) { ctx := context.Background() client, err := NewClient(ClientOptions{URL: lookupURL}) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() // subscribe multi topics with Retry Topics @@ -2039,7 +2100,7 @@ func TestRLQMultiTopics(t *testing.T) { RetryEnable: true, NackRedeliveryDelay: 1 * time.Second, }) - assert.Nil(t, err) + require.NoError(t, err) defer rlqConsumer.Close() // subscribe DLQ Topic @@ -2048,31 +2109,31 @@ func TestRLQMultiTopics(t *testing.T) { SubscriptionName: subName, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer dlqConsumer.Close() // create multi producers producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01}) - assert.Nil(t, err) + require.NoError(t, err) defer producer01.Close() producer02, err := client.CreateProducer(ProducerOptions{Topic: topic02}) - assert.Nil(t, err) + require.NoError(t, err) defer producer02.Close() // 1. Pre-produce N messages for every topic for i := 0; i < N; i++ { _, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))}) - assert.Nil(t, err) + require.NoError(t, err) _, err = producer02.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_02_%d", i))}) - assert.Nil(t, err) + require.NoError(t, err) } // 2. Create consumer on the Retry Topics to reconsume 2*N messages (maxRedeliveries+1) times rlqReceived := 0 for rlqReceived < 2*N*(maxRedeliveries+1) { msg, err := rlqConsumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) rlqConsumer.ReconsumeLater(msg, 1*time.Second) rlqReceived++ } @@ -2089,7 +2150,7 @@ func TestRLQMultiTopics(t *testing.T) { dlqReceived := 0 for dlqReceived < 2*N { msg, err := dlqConsumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) dlqConsumer.Ack(msg) dlqReceived++ } @@ -2109,7 +2170,7 @@ func TestRLQMultiTopics(t *testing.T) { Type: Shared, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer checkConsumer.Close() timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -2225,7 +2286,7 @@ func TestGetDeliveryCount(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -2238,14 +2299,14 @@ func TestGetDeliveryCount(t *testing.T) { NackRedeliveryDelay: 1 * time.Second, Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages @@ -2274,14 +2335,14 @@ func TestGetDeliveryCount(t *testing.T) { var msg Message for i := 0; i < 5; i++ { msg, err = consumer.Receive(context.Background()) - assert.Nil(t, err) + require.NoError(t, err) consumer.Nack(msg) } assert.Equal(t, uint32(i+1), msg.RedeliveryCount()) } msg, err := consumer.Receive(context.Background()) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, uint32(3), msg.RedeliveryCount()) } @@ -2289,7 +2350,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -2307,7 +2368,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) { }, PartitionsAutoDiscoveryInterval: 100 * time.Millisecond, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // Increase number of partitions to 10 @@ -2321,7 +2382,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) { SubscriptionName: "my-sub", AutoDiscoveryPeriod: 100 * time.Millisecond, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // Publish messages ensuring that they will go to all the partitions @@ -2331,14 +2392,14 @@ func TestConsumerAddTopicPartitions(t *testing.T) { Key: fmt.Sprintf("%d", i), Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } msgs := make([]string, 0) for i := 0; i < 10; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) msgs = append(msgs, string(msg.Payload())) t.Logf("Received message msgId: %#v -- content: '%s'\n", @@ -2355,7 +2416,7 @@ func TestConsumerNegativeReceiverQueueSize(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -2370,14 +2431,14 @@ func TestConsumerNegativeReceiverQueueSize(t *testing.T) { } }() - assert.Nil(t, err) + require.NoError(t, err) } func TestProducerName(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -2388,7 +2449,7 @@ func TestProducerName(t *testing.T) { Topic: topic, Name: producerName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // create consumer @@ -2397,7 +2458,7 @@ func TestProducerName(t *testing.T) { SubscriptionName: "my-sub", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // publish 10 messages to topic @@ -2406,12 +2467,12 @@ func TestProducerName(t *testing.T) { _, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } for i := 0; i < 10; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, msg.ProducerName(), producerName) consumer.Ack(msg) @@ -2466,7 +2527,7 @@ func TestConsumerWithInterceptors(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -2486,7 +2547,7 @@ func TestConsumerWithInterceptors(t *testing.T) { metric, }, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create producer @@ -2494,7 +2555,7 @@ func TestConsumerWithInterceptors(t *testing.T) { Topic: topic, DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages @@ -2593,7 +2654,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/test-key-based-batch-with-key-shared" @@ -2603,7 +2664,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() consumer2, err := client.Subscribe(ConsumerOptions{ @@ -2611,7 +2672,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer2.Close() // create producer @@ -2621,7 +2682,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { BatcherBuilderType: KeyBasedBatchBuilder, BatchingMaxMessages: 10, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() ctx := context.Background() @@ -2632,7 +2693,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { Key: k, Payload: []byte(fmt.Sprintf("value-%d", i)), }, func(id MessageID, producerMessage *ProducerMessage, err error) { - assert.Nil(t, err) + require.NoError(t, err) }, ) } @@ -2695,7 +2756,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/test-ordering-of-key-based-batch-with-key-shared" @@ -2705,7 +2766,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() // create producer @@ -2716,7 +2777,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { BatchingMaxMessages: 30, BatchingMaxPublishDelay: time.Second * 5, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() ctx := context.Background() @@ -2727,7 +2788,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { Key: k, Payload: []byte(fmt.Sprintf("value-%d", i)), }, func(id MessageID, producerMessage *ProducerMessage, err error) { - assert.Nil(t, err) + require.NoError(t, err) }, ) } @@ -2761,7 +2822,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { OrderingKey: k, Payload: []byte(fmt.Sprintf("value-%d", i)), }, func(id MessageID, producerMessage *ProducerMessage, err error) { - assert.Nil(t, err) + require.NoError(t, err) }, ) } @@ -2794,7 +2855,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { URL: lookupURL, }, ) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/test-key-shared-with-ordering-key" @@ -2806,7 +2867,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { Type: KeyShared, }, ) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() consumer2, err := client.Subscribe( @@ -2816,7 +2877,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { Type: KeyShared, }, ) - assert.Nil(t, err) + require.NoError(t, err) defer consumer2.Close() // create producer @@ -2826,7 +2887,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { DisableBatching: true, }, ) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() ctx := context.Background() @@ -2839,7 +2900,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { Payload: []byte(fmt.Sprintf("value-%d", i)), }, ) - assert.Nil(t, err) + require.NoError(t, err) } receivedConsumer1 := 0 diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go index 8117127286..9e9f80215a 100644 --- a/pulsar/consumer_zero_queue.go +++ b/pulsar/consumer_zero_queue.go @@ -136,6 +136,10 @@ func (z *zeroQueueConsumer) Chan() <-chan ConsumerMessage { panic("zeroQueueConsumer cannot support Chan method") } +func (z *zeroQueueConsumer) Closed() <-chan struct{} { + return z.closeCh +} + func (z *zeroQueueConsumer) Ack(m Message) error { return z.AckID(m.ID()) } diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index d8c8f7c9a1..69043f1e14 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" @@ -95,6 +97,7 @@ func TestNormalZeroQueueConsumer(t *testing.T) { err = consumer.Unsubscribe() assert.Nil(t, err) } + func TestPartitionZeroQueueConsumer(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, @@ -138,6 +141,35 @@ func TestOnePartitionZeroQueueConsumer(t *testing.T) { assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics") } +func TestZeroQueueConsumerClose(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + require.NoError(t, err) + defer client.Close() + + topic := newTopicName() + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + EnableZeroQueueConsumer: true, + }) + require.NoError(t, err) + _, ok := consumer.(*zeroQueueConsumer) + assert.True(t, ok) + + consumer.Close() + + select { + case <-consumer.Closed(): + default: + assert.Fail(t, "consumer should be closed") + } +} + func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index 05953cf1a6..aae28fa77a 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -99,6 +99,10 @@ func (c *mockConsumer) NackID(msgID pulsar.MessageID) {} func (c *mockConsumer) Close() {} +func (c *mockConsumer) Closed() <-chan struct{} { + return make(chan struct{}) +} + func (c *mockConsumer) Seek(msgID pulsar.MessageID) error { return nil }