diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/AbstractPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/AbstractPersistentDispatcherMultipleConsumers.java index 79d365b9fee21..3a63dc6594704 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/AbstractPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/AbstractPersistentDispatcherMultipleConsumers.java @@ -18,8 +18,10 @@ */ package org.apache.pulsar.broker.service.persistent; +import java.util.List; import java.util.Map; import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; @@ -64,4 +66,12 @@ public AbstractPersistentDispatcherMultipleConsumers(Subscription subscription, public abstract Map getBucketDelayedIndexStats(); public abstract boolean isClassic(); + + static long getTotalBytesSize(List entries) { + long totalBytesSize = 0; + for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) { + totalBytesSize += entries.get(i).getLength(); + } + return totalBytesSize; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 9bcf9153572a9..d17321b1b9bb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -702,7 +702,7 @@ public final synchronized void readEntriesComplete(List entries, Object c .attr("consumerCount", consumerList.size()) .log("Distributing messages to consumers"); - long totalBytesSize = entries.stream().mapToLong(Entry::getLength).sum(); + long totalBytesSize = getTotalBytesSize(entries); updatePendingBytesToDispatch(totalBytesSize); // dispatch messages to a separate thread, but still in order for this subscription diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 276f8c038a67c..d30ce857641bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -614,7 +614,7 @@ public final synchronized void readEntriesComplete(List entries, Object c .attr("consumerCount", consumerList.size()) .log("Distributing messages to consumers"); - long size = entries.stream().mapToLong(Entry::getLength).sum(); + long size = getTotalBytesSize(entries); updatePendingBytesToDispatch(size); // dispatch messages to a separate thread, but still in order for this subscription diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherTotalBytesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherTotalBytesTest.java new file mode 100644 index 0000000000000..239a698ab011b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherTotalBytesTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.testng.Assert.assertEquals; +import java.util.ArrayList; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class PersistentDispatcherTotalBytesTest { + + @DataProvider(name = "entryCounts") + public Object[][] entryCounts() { + return new Object[][] { + {0}, + {1}, + {32}, + {1024} + }; + } + + @Test(dataProvider = "entryCounts") + public void testGetTotalBytesSize(int entryCount) { + EntriesAndExpectedSize entries = entriesWithVaryingPayloadSizes(entryCount); + try { + assertEquals(AbstractPersistentDispatcherMultipleConsumers.getTotalBytesSize(entries.entries), + entries.expectedSize); + } finally { + entries.release(); + } + } + + private static EntriesAndExpectedSize entriesWithVaryingPayloadSizes(int entryCount) { + EntriesAndExpectedSize entries = new EntriesAndExpectedSize(entryCount); + for (int i = 0; i < entryCount; i++) { + int payloadSize = payloadSize(i); + entries.entries.add(EntryImpl.create(1, i, new byte[payloadSize])); + entries.expectedSize += payloadSize; + } + return entries; + } + + private static int payloadSize(int index) { + return index % 97 == 0 ? 4096 : 1 + ((index * 31) & 1023); + } + + private static final class EntriesAndExpectedSize { + private final ArrayList entries; + private long expectedSize; + + private EntriesAndExpectedSize(int entryCount) { + this.entries = new ArrayList<>(entryCount); + } + + private void release() { + entries.forEach(Entry::release); + } + } +}