diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index be33c8ced295c..7d2515ba183f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -135,6 +135,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i int filteredMessageCount = 0; int filteredEntryCount = 0; long filteredBytesCount = 0; + final boolean hasFilter = this.hasFilter; List entriesToFiltered = hasFilter ? new ArrayList<>() : null; List entriesToRedeliver = hasFilter ? new ArrayList<>() : null; for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) { @@ -161,29 +162,27 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i this.filterProcessedMsgs.add(entryMsgCnt); } - EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer); - if (filterResult == EntryFilter.FilterResult.REJECT) { - entriesToFiltered.add(entry.getPosition()); - entries.set(i, null); - // FilterResult will be always `ACCEPTED` when there is No Filter - // dont need to judge whether `hasFilter` is true or not. - this.filterRejectedMsgs.add(entryMsgCnt); - filteredEntryCount++; - filteredMessageCount += entryMsgCnt; - filteredBytesCount += metadataAndPayload.readableBytes(); - entry.release(); - continue; - } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) { - entriesToRedeliver.add(entry.getPosition()); - entries.set(i, null); - // FilterResult will be always `ACCEPTED` when there is No Filter - // dont need to judge whether `hasFilter` is true or not. - this.filterRescheduledMsgs.add(entryMsgCnt); - filteredEntryCount++; - filteredMessageCount += entryMsgCnt; - filteredBytesCount += metadataAndPayload.readableBytes(); - entry.release(); - continue; + if (hasFilter) { + EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer); + if (filterResult == EntryFilter.FilterResult.REJECT) { + entriesToFiltered.add(entry.getPosition()); + entries.set(i, null); + this.filterRejectedMsgs.add(entryMsgCnt); + filteredEntryCount++; + filteredMessageCount += entryMsgCnt; + filteredBytesCount += metadataAndPayload.readableBytes(); + entry.release(); + continue; + } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) { + entriesToRedeliver.add(entry.getPosition()); + entries.set(i, null); + this.filterRescheduledMsgs.add(entryMsgCnt); + filteredEntryCount++; + filteredMessageCount += entryMsgCnt; + filteredBytesCount += metadataAndPayload.readableBytes(); + entry.release(); + continue; + } } if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java index 55912305c758b..b3b12505a0f11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java @@ -85,6 +85,30 @@ public void testFilterEntriesForConsumerOfNullElement() { assertEquals(size, 0); } + @Test + public void testFilterEntriesForConsumerSkipsFilterWhenNoFiltersConfigured() { + EntriesAndExpectedMetadata entries = createEntriesWithVaryingBatchSizes(MANY_ENTRIES_COUNT); + + SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); + EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.entries.size()); + try { + int size = this.helper.filterEntriesForConsumer(entries.entries, batchSizes, sendMessageInfo, + null, null, false, null); + + assertEquals(size, MANY_ENTRIES_COUNT); + assertEquals(this.helper.getFilterInvocationCount(), 0); + assertEquals(sendMessageInfo.getTotalMessages(), entries.expectedMessages); + assertEquals(sendMessageInfo.getTotalBytes(), entries.expectedBytes); + assertEquals(sendMessageInfo.getTotalChunkedMessages(), 0); + for (int i = 0; i < MANY_ENTRIES_COUNT; i++) { + assertEquals(batchSizes.getBatchSize(i), batchSizeForEntry(i)); + } + } finally { + entries.release(); + batchSizes.recyle(); + } + } + @Test public void testFilterEntriesForConsumerKeepsNullAckSetAbsentForManyEntries() { when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(false); @@ -115,7 +139,6 @@ public void testFilterEntriesForConsumerKeepsNullAckSetAbsentForManyEntries() { } } - @Test public void testFilterEntriesForConsumerOfEntryFilter() throws Exception { Topic mockTopic = mock(Topic.class); @@ -311,6 +334,7 @@ private ByteBuf createDelayedMessage(String message, int sequenceId) { private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher { private final Optional dispatchRateLimiter; + private int filterInvocationCount; protected AbstractBaseDispatcherTestHelper(Subscription subscription, ServiceConfiguration serviceConfig, @@ -324,6 +348,17 @@ public Optional getRateLimiter() { return dispatchRateLimiter; } + @Override + public EntryFilter.FilterResult runFiltersForEntry(Entry entry, MessageMetadata msgMetadata, + Consumer consumer) { + filterInvocationCount++; + return super.runFiltersForEntry(entry, msgMetadata, consumer); + } + + int getFilterInvocationCount() { + return filterInvocationCount; + } + @Override protected boolean isConsumersExceededOnSubscription() { return false;