Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
int filteredMessageCount = 0;
int filteredEntryCount = 0;
long filteredBytesCount = 0;
final boolean hasFilter = this.hasFilter;
List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
List<Position> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Expand All @@ -161,29 +162,27 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
this.filterProcessedMsgs.add(entryMsgCnt);
}

EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add(entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
if (hasFilter) {
EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
this.filterRejectedMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add(entry.getPosition());
entries.set(i, null);
this.filterRescheduledMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
}
}
if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,30 @@ public void testFilterEntriesForConsumerOfNullElement() {
assertEquals(size, 0);
}

@Test
public void testFilterEntriesForConsumerSkipsFilterWhenNoFiltersConfigured() {
EntriesAndExpectedMetadata entries = createEntriesWithVaryingBatchSizes(MANY_ENTRIES_COUNT);

SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.entries.size());
try {
int size = this.helper.filterEntriesForConsumer(entries.entries, batchSizes, sendMessageInfo,
null, null, false, null);

assertEquals(size, MANY_ENTRIES_COUNT);
assertEquals(this.helper.getFilterInvocationCount(), 0);
assertEquals(sendMessageInfo.getTotalMessages(), entries.expectedMessages);
assertEquals(sendMessageInfo.getTotalBytes(), entries.expectedBytes);
assertEquals(sendMessageInfo.getTotalChunkedMessages(), 0);
for (int i = 0; i < MANY_ENTRIES_COUNT; i++) {
assertEquals(batchSizes.getBatchSize(i), batchSizeForEntry(i));
}
} finally {
entries.release();
batchSizes.recyle();
}
}

@Test
public void testFilterEntriesForConsumerKeepsNullAckSetAbsentForManyEntries() {
when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(false);
Expand Down Expand Up @@ -115,7 +139,6 @@ public void testFilterEntriesForConsumerKeepsNullAckSetAbsentForManyEntries() {
}
}


@Test
public void testFilterEntriesForConsumerOfEntryFilter() throws Exception {
Topic mockTopic = mock(Topic.class);
Expand Down Expand Up @@ -311,6 +334,7 @@ private ByteBuf createDelayedMessage(String message, int sequenceId) {
private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {

private final Optional<DispatchRateLimiter> dispatchRateLimiter;
private int filterInvocationCount;

protected AbstractBaseDispatcherTestHelper(Subscription subscription,
ServiceConfiguration serviceConfig,
Expand All @@ -324,6 +348,17 @@ public Optional<DispatchRateLimiter> getRateLimiter() {
return dispatchRateLimiter;
}

@Override
public EntryFilter.FilterResult runFiltersForEntry(Entry entry, MessageMetadata msgMetadata,
Consumer consumer) {
filterInvocationCount++;
return super.runFiltersForEntry(entry, msgMetadata, consumer);
}

int getFilterInvocationCount() {
return filterInvocationCount;
}

@Override
protected boolean isConsumersExceededOnSubscription() {
return false;
Expand Down