From a6d7ee7fe547aef416f97783304698bcc53b74fe Mon Sep 17 00:00:00 2001 From: Dream95 Date: Thu, 11 Jun 2026 19:13:02 +0800 Subject: [PATCH] [fix][client] Fix unAckedMessageTracker cleanup on multi-topics batch ack Signed-off-by: Dream95 --- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- .../impl/MultiTopicsConsumerImplTest.java | 35 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index aa7b710ff3b9f..dad3285b53d0a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -537,7 +537,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, } consumerToMessageIds.forEach((consumer, messageIds) -> { resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) - .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); + .thenAccept((res) -> messageIds.forEach(unAckedMessageTracker::remove))); }); } return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 217b2782ea684..7192f0317b5d5 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -23,6 +23,7 @@ import static org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -320,4 +321,38 @@ public void testOnTopicsExtendedRemovedTopicCleansUnackedMessages() { verify(partitionConsumer1).closeAsync(); } + @Test + @SuppressWarnings("unchecked") + public void testBatchAcknowledgeRemovesOnlyAckedMessageIdsFromTracker() { + String topic0 = "persistent://public/default/topic-a-partition-0"; + String topic1 = "persistent://public/default/topic-b-partition-0"; + + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setSubscriptionName("subscriptionName"); + conf.setAckTimeoutMillis(1000); + MultiTopicsConsumerImpl impl = createMultiTopicsConsumer(conf); + impl.setState(HandlerState.State.Ready); + + ConsumerImpl consumer0 = mock(ConsumerImpl.class); + ConsumerImpl consumer1 = mock(ConsumerImpl.class); + CompletableFuture pendingAck = new CompletableFuture<>(); + when(consumer0.getTopic()).thenReturn(topic0); + when(consumer1.getTopic()).thenReturn(topic1); + when(consumer0.doAcknowledgeWithTxn(anyList(), any(), any(), any())).thenReturn(pendingAck); + when(consumer1.doAcknowledgeWithTxn(anyList(), any(), any(), any())).thenReturn(new CompletableFuture<>()); + + impl.consumers.put(topic0, consumer0); + impl.consumers.put(topic1, consumer1); + + TopicMessageIdImpl messageId0 = new TopicMessageIdImpl(topic0, new MessageIdImpl(1, 1, 0)); + TopicMessageIdImpl messageId1 = new TopicMessageIdImpl(topic1, new MessageIdImpl(2, 2, 0)); + impl.getUnAckedMessageTracker().add(messageId0); + impl.getUnAckedMessageTracker().add(messageId1); + + impl.acknowledgeAsync(Arrays.asList(messageId0, messageId1)); + pendingAck.complete(null); + + assertEquals(impl.getUnAckedMessageTracker().size(), 1); + } + }