From 6d9d5001ef0847695014853be1a7cccac0f4d5f9 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Wed, 21 Jan 2026 22:07:12 +0800 Subject: [PATCH 1/7] [Issue #1458] educe unnecessary creation of retry topics and DLQ topics --- pulsar/consumer_impl.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 08c825e3e9..734ebf2590 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -150,19 +150,18 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix - if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil && - r != nil && - r.Partitions > 0 { - retryTopic = oldRetryTopic - } - - if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil && - r != nil && - r.Partitions > 0 { - dlqTopic = oldDlqTopic + CheckTopicIsExists := func(topic string) bool { + r, err := client.lookupService.GetPartitionedTopicMetadata(topic) + return err == nil && r != nil && r.Partitions > 0 } if options.DLQ == nil { + if CheckTopicIsExists(oldRetryTopic) { + retryTopic = oldRetryTopic + } + if CheckTopicIsExists(oldDlqTopic) { + dlqTopic = oldDlqTopic + } options.DLQ = &DLQPolicy{ MaxDeliveries: MaxReconsumeTimes, DeadLetterTopic: dlqTopic, @@ -170,12 +169,19 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } else { if options.DLQ.DeadLetterTopic == "" { + if CheckTopicIsExists(oldDlqTopic) { + dlqTopic = oldDlqTopic + } options.DLQ.DeadLetterTopic = dlqTopic } if options.DLQ.RetryLetterTopic == "" { + if CheckTopicIsExists(oldRetryTopic) { + retryTopic = oldRetryTopic + } options.DLQ.RetryLetterTopic = retryTopic } } + if options.Topic != "" && len(options.Topics) == 0 { options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic} options.Topic = "" From aab42bee492ea75cf21268965ac59469791e8099 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Fri, 23 Jan 2026 12:27:04 +0800 Subject: [PATCH 2/7] [Issue #1458] educe unnecessary creation of retry topics and DLQ topics --- pulsar/consumer_impl.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 734ebf2590..fc46366a96 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -155,6 +155,8 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return err == nil && r != nil && r.Partitions > 0 } + //If the retryTopic or DLQTopic has already been set, + //there is no need to perform GetPartitiondTopicMetadata operation on oldRetroTopic or oldDLQTopic if options.DLQ == nil { if CheckTopicIsExists(oldRetryTopic) { retryTopic = oldRetryTopic From 8c4d376a963ee09e8e6f27b83644e551a60dd72c Mon Sep 17 00:00:00 2001 From: zhenJiangWang Date: Tue, 3 Feb 2026 16:01:48 +0800 Subject: [PATCH 3/7] Update pulsar/consumer_impl.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pulsar/consumer_impl.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index fc46366a96..47bee7b393 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -155,8 +155,8 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return err == nil && r != nil && r.Partitions > 0 } - //If the retryTopic or DLQTopic has already been set, - //there is no need to perform GetPartitiondTopicMetadata operation on oldRetroTopic or oldDLQTopic + // Check for old topic naming format. + // When DLQ policy is not provided, check both old topics for backward compatibility. if options.DLQ == nil { if CheckTopicIsExists(oldRetryTopic) { retryTopic = oldRetryTopic From ad67f75d3787a06a698ea55f451b05adf7c36fc0 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 3 Feb 2026 16:10:19 +0800 Subject: [PATCH 4/7] [Issue #1458] educe unnecessary creation of retry topics and DLQ topics --- pulsar/consumer_impl.go | 48 ++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 47bee7b393..0e057c25d8 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -150,39 +150,37 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix - CheckTopicIsExists := func(topic string) bool { + // Check for old topic naming format. + // When DLQ policy is not provided, check both old topics for backward compatibility. + checkTopicIsExists := func(topic string) bool { r, err := client.lookupService.GetPartitionedTopicMetadata(topic) return err == nil && r != nil && r.Partitions > 0 } - - // Check for old topic naming format. - // When DLQ policy is not provided, check both old topics for backward compatibility. - if options.DLQ == nil { - if CheckTopicIsExists(oldRetryTopic) { - retryTopic = oldRetryTopic + resolveTopic := func(current, old string, new string) string { + if current != "" { + return current } - if CheckTopicIsExists(oldDlqTopic) { - dlqTopic = oldDlqTopic + if checkTopicIsExists(old) { + return old + } else { + return new } + } + if options.DLQ == nil { options.DLQ = &DLQPolicy{ - MaxDeliveries: MaxReconsumeTimes, - DeadLetterTopic: dlqTopic, - RetryLetterTopic: retryTopic, - } - } else { - if options.DLQ.DeadLetterTopic == "" { - if CheckTopicIsExists(oldDlqTopic) { - dlqTopic = oldDlqTopic - } - options.DLQ.DeadLetterTopic = dlqTopic - } - if options.DLQ.RetryLetterTopic == "" { - if CheckTopicIsExists(oldRetryTopic) { - retryTopic = oldRetryTopic - } - options.DLQ.RetryLetterTopic = retryTopic + MaxDeliveries: MaxReconsumeTimes, } } + options.DLQ.DeadLetterTopic = resolveTopic( + options.DLQ.DeadLetterTopic, + oldDlqTopic, + dlqTopic, + ) + options.DLQ.RetryLetterTopic = resolveTopic( + options.DLQ.RetryLetterTopic, + oldRetryTopic, + retryTopic, + ) if options.Topic != "" && len(options.Topics) == 0 { options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic} From 0ab09c38f5ff9ede9445115cf5a8a31d5a1addc4 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 3 Feb 2026 21:43:41 +0800 Subject: [PATCH 5/7] [Issue #1458] educe unnecessary creation of retry topics and DLQ topics --- pulsar/consumer_test.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 9f20ed8417..0955e59ef4 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5710,3 +5710,40 @@ func TestSelectConnectionForSameConsumer(t *testing.T) { "The consumer uses a different connection when reconnecting") } } + +/** + * Test that GetPartitionedTopicMetadata is not called when DLQ and Retry topics are provided. + */ +func TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata(t *testing.T) { + + type mockLookupService struct { + getPartitionedTopicMetadataCalled bool + callCount int + } + + mockLS := &mockLookupService{} + + client, _ := NewClient(ClientOptions{ + URL: serviceURL, + MaxConnectionsPerBroker: 10, + }) + + options1 := ConsumerOptions{ + Topic: "persistent://public/default/test-topic", + SubscriptionName: "test-subscription", + RetryEnable: true, + DLQ: &DLQPolicy{ + MaxDeliveries: 3, + DeadLetterTopic: "persistent://public/default/my-dlq-topic", // 用户自定义的 DLQ topic + RetryLetterTopic: "persistent://public/default/my-retry-topic", // 用户自定义的 Retry topic + }, + } + + _, _ = client.Subscribe(options1) + + assert.False(t, mockLS.getPartitionedTopicMetadataCalled, + "GetPartitionedTopicMetadata should not be called when custom DLQ and Retry topics are provided") + assert.Equal(t, 0, mockLS.callCount, + "GetPartitionedTopicMetadata call count should be 0 when custom topics are provided") + +} From 545fe0177a6a4b2f75d2742c7f1f48842b866f0a Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 11 Mar 2026 08:55:25 +0800 Subject: [PATCH 6/7] fix lint --- pulsar/consumer_impl.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 0e057c25d8..45c0a0e72e 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -156,15 +156,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { r, err := client.lookupService.GetPartitionedTopicMetadata(topic) return err == nil && r != nil && r.Partitions > 0 } - resolveTopic := func(current, old string, new string) string { + resolveTopic := func(current, old, defaultTopic string) string { if current != "" { return current } if checkTopicIsExists(old) { return old - } else { - return new } + return defaultTopic } if options.DLQ == nil { options.DLQ = &DLQPolicy{ From 69dcc0edb93decb09cfc173781858322920b7084 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Thu, 30 Apr 2026 14:44:41 +0800 Subject: [PATCH 7/7] [Issue #1458] educe unnecessary creation of retry topics and DLQ topics --- pulsar/consumer_test.go | 72 +++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 0955e59ef4..144ea9c6fa 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5711,39 +5711,69 @@ func TestSelectConnectionForSameConsumer(t *testing.T) { } } -/** - * Test that GetPartitionedTopicMetadata is not called when DLQ and Retry topics are provided. - */ -func TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata(t *testing.T) { - - type mockLookupService struct { - getPartitionedTopicMetadataCalled bool - callCount int - } +// lookupServiceWrapper embeds the original LookupService and only overrides +// GetPartitionedTopicMetadata to record the topics that were queried. +type lookupServiceWrapper struct { + internal.LookupService + mu sync.Mutex + calledTopics []string +} - mockLS := &mockLookupService{} +func (w *lookupServiceWrapper) GetPartitionedTopicMetadata(topic string) (*internal.PartitionedTopicMetadata, error) { + w.mu.Lock() + w.calledTopics = append(w.calledTopics, topic) + w.mu.Unlock() + return &internal.PartitionedTopicMetadata{Partitions: 0}, nil +} - client, _ := NewClient(ClientOptions{ +// TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata verifies that when custom DLQ and Retry +// topics are provided in DLQPolicy, the resolveTopic function should NOT call GetPartitionedTopicMetadata +// to check old-format DLQ/Retry topics. This ensures the optimization path works correctly. +func TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata(t *testing.T) { + // Create a real client with a short operation timeout so that the subsequent + // consumer creation (which requires a broker connection) fails quickly. + c, err := NewClient(ClientOptions{ URL: serviceURL, MaxConnectionsPerBroker: 10, + OperationTimeout: 1 * time.Second, }) + assert.NoError(t, err) + defer c.Close() + + // Replace the client's internal lookupService with our wrapper to intercept + // and record all GetPartitionedTopicMetadata calls. + realClient := c.(*client) + wrapper := &lookupServiceWrapper{LookupService: realClient.lookupService} + realClient.lookupService = wrapper - options1 := ConsumerOptions{ + // Subscribe with custom DLQ and Retry topics specified. + // The Subscribe call will fail due to no broker connection, but the + // resolveTopic logic executes before the connection attempt. + _, _ = c.Subscribe(ConsumerOptions{ Topic: "persistent://public/default/test-topic", SubscriptionName: "test-subscription", RetryEnable: true, DLQ: &DLQPolicy{ MaxDeliveries: 3, - DeadLetterTopic: "persistent://public/default/my-dlq-topic", // 用户自定义的 DLQ topic - RetryLetterTopic: "persistent://public/default/my-retry-topic", // 用户自定义的 Retry topic + DeadLetterTopic: "persistent://public/default/my-dlq-topic", + RetryLetterTopic: "persistent://public/default/my-retry-topic", }, - } - - _, _ = client.Subscribe(options1) + }) - assert.False(t, mockLS.getPartitionedTopicMetadataCalled, - "GetPartitionedTopicMetadata should not be called when custom DLQ and Retry topics are provided") - assert.Equal(t, 0, mockLS.callCount, - "GetPartitionedTopicMetadata call count should be 0 when custom topics are provided") + // These are the old-format topics that resolveTopic would check via + // GetPartitionedTopicMetadata if no custom topics were provided. + oldDlqTopic := "persistent://public/default/test-subscription" + DlqTopicSuffix + oldRetryTopic := "persistent://public/default/test-subscription" + RetryTopicSuffix + // Verify that GetPartitionedTopicMetadata was never called with old-format topics. + // When custom DLQ/Retry topics are provided, resolveTopic should return them directly + // without checking whether old-format topics exist. + wrapper.mu.Lock() + defer wrapper.mu.Unlock() + for _, topic := range wrapper.calledTopics { + assert.NotEqual(t, oldDlqTopic, topic, + "GetPartitionedTopicMetadata should not be called with old DLQ topic when custom DLQ topic is provided") + assert.NotEqual(t, oldRetryTopic, topic, + "GetPartitionedTopicMetadata should not be called with old Retry topic when custom Retry topic is provided") + } }