From ba8c7646395927828db805415ff6b485eab4f8fb Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Thu, 18 Jun 2026 14:14:15 +0800 Subject: [PATCH] [improve][broker] Reduce base dispatcher fast-path overhead --- .../service/AbstractBaseDispatcher.java | 45 +++++----- .../service/AbstractBaseDispatcherTest.java | 84 +++++++++++++++++++ 2 files changed, 106 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index e57261bba2ee5..b09e48182ee53 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -135,6 +135,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i int filteredMessageCount = 0; int filteredEntryCount = 0; long filteredBytesCount = 0; + final boolean hasFilter = this.hasFilter; List entriesToFiltered = hasFilter ? new ArrayList<>() : null; List entriesToRedeliver = hasFilter ? new ArrayList<>() : null; for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) { @@ -161,29 +162,27 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i this.filterProcessedMsgs.add(entryMsgCnt); } - EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer); - if (filterResult == EntryFilter.FilterResult.REJECT) { - entriesToFiltered.add(entry.getPosition()); - entries.set(i, null); - // FilterResult will be always `ACCEPTED` when there is No Filter - // dont need to judge whether `hasFilter` is true or not. - this.filterRejectedMsgs.add(entryMsgCnt); - filteredEntryCount++; - filteredMessageCount += entryMsgCnt; - filteredBytesCount += metadataAndPayload.readableBytes(); - entry.release(); - continue; - } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) { - entriesToRedeliver.add(entry.getPosition()); - entries.set(i, null); - // FilterResult will be always `ACCEPTED` when there is No Filter - // dont need to judge whether `hasFilter` is true or not. - this.filterRescheduledMsgs.add(entryMsgCnt); - filteredEntryCount++; - filteredMessageCount += entryMsgCnt; - filteredBytesCount += metadataAndPayload.readableBytes(); - entry.release(); - continue; + if (hasFilter) { + EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer); + if (filterResult == EntryFilter.FilterResult.REJECT) { + entriesToFiltered.add(entry.getPosition()); + entries.set(i, null); + this.filterRejectedMsgs.add(entryMsgCnt); + filteredEntryCount++; + filteredMessageCount += entryMsgCnt; + filteredBytesCount += metadataAndPayload.readableBytes(); + entry.release(); + continue; + } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) { + entriesToRedeliver.add(entry.getPosition()); + entries.set(i, null); + this.filterRescheduledMsgs.add(entryMsgCnt); + filteredEntryCount++; + filteredMessageCount += entryMsgCnt; + filteredBytesCount += metadataAndPayload.readableBytes(); + entry.release(); + continue; + } } if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java index f972f176ece5d..a22f8d3578346 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java @@ -56,6 +56,8 @@ @Test(groups = "broker") public class AbstractBaseDispatcherTest { + private static final int MANY_ENTRIES_COUNT = 512; + private AbstractBaseDispatcherTestHelper helper; private ServiceConfiguration svcConfig; @@ -82,6 +84,29 @@ public void testFilterEntriesForConsumerOfNullElement() { assertEquals(size, 0); } + @Test + public void testFilterEntriesForConsumerSkipsFilterWhenNoFiltersConfigured() { + EntriesAndExpectedMetadata entries = createEntriesWithVaryingBatchSizes(MANY_ENTRIES_COUNT); + + SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); + EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.entries.size()); + try { + int size = this.helper.filterEntriesForConsumer(entries.entries, batchSizes, sendMessageInfo, + null, null, false, null); + + assertEquals(size, MANY_ENTRIES_COUNT); + assertEquals(this.helper.getFilterInvocationCount(), 0); + assertEquals(sendMessageInfo.getTotalMessages(), entries.expectedMessages); + assertEquals(sendMessageInfo.getTotalBytes(), entries.expectedBytes); + assertEquals(sendMessageInfo.getTotalChunkedMessages(), 0); + for (int i = 0; i < MANY_ENTRIES_COUNT; i++) { + assertEquals(batchSizes.getBatchSize(i), batchSizeForEntry(i)); + } + } finally { + entries.release(); + batchSizes.recyle(); + } + } @Test public void testFilterEntriesForConsumerOfEntryFilter() throws Exception { @@ -192,6 +217,53 @@ private ByteBuf createMessage(String message, int sequenceId) { Unpooled.copiedBuffer(message.getBytes(UTF_8))); } + private ByteBuf createBatchMessage(String message, int sequenceId, int batchSize) { + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(sequenceId) + .setProducerName("testProducer") + .setPartitionKeyB64Encoded(false) + .setPublishTime(System.currentTimeMillis()) + .setNumMessagesInBatch(batchSize); + return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, + Unpooled.copiedBuffer(message.getBytes(UTF_8))); + } + + private EntriesAndExpectedMetadata createEntriesWithVaryingBatchSizes(int entryCount) { + EntriesAndExpectedMetadata entries = new EntriesAndExpectedMetadata(entryCount); + for (int i = 0; i < entryCount; i++) { + int batchSize = batchSizeForEntry(i); + ByteBuf message = createBatchMessage(messagePayload(i), i, batchSize); + Entry entry = EntryImpl.create(1, i, message); + message.release(); + entries.entries.add(entry); + entries.expectedMessages += batchSize; + entries.expectedBytes += entry.getLength(); + } + return entries; + } + + private static int batchSizeForEntry(int index) { + return 1 + (index % 10); + } + + private static String messagePayload(int index) { + return "message-" + index + "-" + "x".repeat(1 + (index % 128)); + } + + private static final class EntriesAndExpectedMetadata { + private final List entries; + private int expectedMessages; + private long expectedBytes; + + private EntriesAndExpectedMetadata(int entryCount) { + this.entries = new ArrayList<>(entryCount); + } + + private void release() { + entries.forEach(Entry::release); + } + } + private ByteBuf createTnxMessage(String message, int sequenceId) { MessageMetadata messageMetadata = new MessageMetadata() .setSequenceId(sequenceId) @@ -231,6 +303,7 @@ private ByteBuf createDelayedMessage(String message, int sequenceId) { private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher { private final Optional dispatchRateLimiter; + private int filterInvocationCount; protected AbstractBaseDispatcherTestHelper(Subscription subscription, ServiceConfiguration serviceConfig, @@ -244,6 +317,17 @@ public Optional getRateLimiter() { return dispatchRateLimiter; } + @Override + public EntryFilter.FilterResult runFiltersForEntry(Entry entry, MessageMetadata msgMetadata, + Consumer consumer) { + filterInvocationCount++; + return super.runFiltersForEntry(entry, msgMetadata, consumer); + } + + int getFilterInvocationCount() { + return filterInvocationCount; + } + @Override protected boolean isConsumersExceededOnSubscription() { return false;