From 952da150eed77ed5a119196d341d49ec9be0713e Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sat, 13 Jun 2026 23:23:34 +0800 Subject: [PATCH 1/6] [improve][broker] Cut PendingAcksMap per-entry overhead --- .../service/PendingAcksMapBenchmark.java | 434 ++++++++++++++++++ .../pulsar/broker/service/package-info.java | 23 + .../pulsar/broker/service/Consumer.java | 46 +- .../pulsar/broker/service/PendingAcksMap.java | 261 ++++++++--- .../broker/service/PendingAcksMapTest.java | 193 +++++++- .../common/util/collections/Long2LongMap.java | 121 +++++ .../collections/Long2LongOpenHashMap.java | 229 +++++++++ .../collections/Long2LongOpenHashMapTest.java | 204 ++++++++ 8 files changed, 1416 insertions(+), 95 deletions(-) create mode 100644 microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java create mode 100644 microbench/src/main/java/org/apache/pulsar/broker/service/package-info.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2LongMap.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMap.java create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMapTest.java diff --git a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java new file mode 100644 index 0000000000000..9b9444c9abe43 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.service; + +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.pulsar.common.util.collections.IntIntPair; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@BenchmarkMode(Mode.AverageTime) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) +public class PendingAcksMapBenchmark { + private static final int PENDING_ACK_NOT_FOUND = -1; + + @Benchmark + public int getRemainingUnackedHit(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.getRemainingUnacked(state.ledgerIds[index], state.entryIds[index]); + } + + @Benchmark + public boolean containsHit(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.contains(state.ledgerIds[index], state.entryIds[index]); + } + + @Benchmark + public boolean addOrReplace(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.addPendingAckIfAllowed(state.ledgerIds[index], state.entryIds[index], + remaining(index), stickyKeyHash(index)); + } + + @Benchmark + public boolean updateRemainingUnacked(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.updateRemainingUnacked(state.ledgerIds[index], state.entryIds[index], 1); + } + + @Benchmark + public int removeAndAddRemaining(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + long ledgerId = state.ledgerIds[index]; + long entryId = state.entryIds[index]; + int removed = state.store.removeAndGetRemainingUnacked(ledgerId, entryId); + state.store.addPendingAckIfAllowed(ledgerId, entryId, + removed == PENDING_ACK_NOT_FOUND ? remaining(index) : removed, stickyKeyHash(index)); + return removed; + } + + @Benchmark + public long forEachAll(MapState state) { + return state.store.forEachAll(); + } + + @Benchmark + public long removeAllUpTo(RangeState state) { + return state.store.removeAllUpTo(state.markDeleteLedgerId, state.markDeleteEntryId); + } + + @Benchmark + public void populate(PopulateState state, Blackhole blackhole) { + PendingAckStore store = createStore(state.implementation); + populate(store, state.entries, state.ledgers, null, null); + blackhole.consume(store); + } + + @State(Scope.Benchmark) + public static class MapState { + @Param({"oldProduction", "production"}) + private String implementation; + + @Param({"64kEntries1kLedgers", "1mEntries16kLedgers"}) + private String dataset; + + private PendingAckStore store; + private long[] ledgerIds; + private long[] entryIds; + private int entries; + private int ledgers; + + @Setup(Level.Trial) + public void setup() { + Dataset parsedDataset = Dataset.from(dataset); + entries = parsedDataset.entries; + ledgers = parsedDataset.ledgers; + ledgerIds = new long[entries]; + entryIds = new long[entries]; + store = createStore(implementation); + populate(store, entries, ledgers, ledgerIds, entryIds); + } + } + + @State(Scope.Thread) + public static class RangeState { + @Param({"oldProduction", "production"}) + private String implementation; + + @Param({"64kEntries1kLedgers", "1mEntries16kLedgers"}) + private String dataset; + + private PendingAckStore store; + private long markDeleteLedgerId; + private long markDeleteEntryId; + + @Setup(Level.Invocation) + public void setup() { + Dataset parsedDataset = Dataset.from(dataset); + store = createStore(implementation); + populate(store, parsedDataset.entries, parsedDataset.ledgers, null, null); + markDeleteLedgerId = parsedDataset.ledgers / 2L; + markDeleteEntryId = parsedDataset.entries / parsedDataset.ledgers / 2L; + } + } + + @State(Scope.Thread) + public static class PopulateState { + @Param({"oldProduction", "production"}) + private String implementation; + + @Param({"64kEntries1kLedgers", "1mEntries16kLedgers"}) + private String dataset; + + private int entries; + private int ledgers; + + @Setup(Level.Trial) + public void setup() { + Dataset parsedDataset = Dataset.from(dataset); + entries = parsedDataset.entries; + ledgers = parsedDataset.ledgers; + } + } + + @State(Scope.Thread) + public static class CursorState { + private int index; + + private int next(int entries) { + int current = index; + index = current + 1; + return current & (entries - 1); + } + } + + private enum Dataset { + ENTRIES_64K_LEDGERS_1K("64kEntries1kLedgers", 65_536, 1_024), + ENTRIES_1M_LEDGERS_16K("1mEntries16kLedgers", 1_048_576, 16_384); + + private final String name; + private final int entries; + private final int ledgers; + + Dataset(String name, int entries, int ledgers) { + this.name = name; + this.entries = entries; + this.ledgers = ledgers; + } + + private static Dataset from(String name) { + for (Dataset dataset : values()) { + if (dataset.name.equals(name)) { + return dataset; + } + } + throw new IllegalArgumentException("Unknown dataset: " + name); + } + } + + private interface PendingAckStore { + boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash); + + boolean contains(long ledgerId, long entryId); + + int getRemainingUnacked(long ledgerId, long entryId); + + boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta); + + int removeAndGetRemainingUnacked(long ledgerId, long entryId); + + long forEachAll(); + + long removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId); + } + + private static PendingAckStore createStore(String implementation) { + return switch (implementation) { + case "oldProduction" -> new OldProductionPendingAckStore(); + case "production" -> new ProductionPendingAckStore(); + default -> throw new IllegalArgumentException("Unknown implementation: " + implementation); + }; + } + + private static void populate(PendingAckStore store, int entries, int ledgers, + long[] ledgerIds, long[] entryIds) { + for (int i = 0; i < entries; i++) { + long ledgerId = i % ledgers; + long entryId = i / ledgers; + if (ledgerIds != null) { + ledgerIds[i] = ledgerId; + entryIds[i] = entryId; + } + store.addPendingAckIfAllowed(ledgerId, entryId, remaining(i), stickyKeyHash(i)); + } + } + + private static int remaining(int index) { + return (index & 15) + 1; + } + + private static int stickyKeyHash(int index) { + return index * 31; + } + + private static final class ProductionPendingAckStore implements PendingAckStore { + private final PendingAcksMap pendingAcks = new PendingAcksMap(null, () -> null, () -> null); + + @Override + public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remainingUnacked, + int stickyKeyHash) { + return pendingAcks.addPendingAckIfAllowed(ledgerId, entryId, remainingUnacked, stickyKeyHash); + } + + @Override + public boolean contains(long ledgerId, long entryId) { + return pendingAcks.contains(ledgerId, entryId); + } + + @Override + public int getRemainingUnacked(long ledgerId, long entryId) { + return pendingAcks.getRemainingUnacked(ledgerId, entryId); + } + + @Override + public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { + return pendingAcks.updateRemainingUnacked(ledgerId, entryId, ackedDelta); + } + + @Override + public int removeAndGetRemainingUnacked(long ledgerId, long entryId) { + return pendingAcks.removeAndGetRemainingUnacked(ledgerId, entryId); + } + + @Override + public long forEachAll() { + long[] total = new long[1]; + pendingAcks.forEach((ledgerId, entryId, remainingUnacked, stickyKeyHash) -> + total[0] += remainingUnacked + stickyKeyHash); + return total[0]; + } + + @Override + public long removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { + long[] total = new long[1]; + pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId, + (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> total[0] += remainingUnacked); + return total[0]; + } + } + + private static final class OldProductionPendingAckStore implements PendingAckStore { + private final TreeMap> pendingAcks = new TreeMap<>(); + private final Lock readLock; + private final Lock writeLock; + + private OldProductionPendingAckStore() { + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + writeLock = readWriteLock.writeLock(); + readLock = readWriteLock.readLock(); + } + + @Override + public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remainingUnacked, + int stickyKeyHash) { + try { + writeLock.lock(); + TreeMap ledgerPendingAcks = + pendingAcks.computeIfAbsent(ledgerId, k -> new TreeMap<>()); + ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, stickyKeyHash)); + return true; + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean contains(long ledgerId, long entryId) { + try { + readLock.lock(); + TreeMap ledgerMap = pendingAcks.get(ledgerId); + return ledgerMap != null && ledgerMap.containsKey(entryId); + } finally { + readLock.unlock(); + } + } + + @Override + public int getRemainingUnacked(long ledgerId, long entryId) { + try { + readLock.lock(); + TreeMap ledgerMap = pendingAcks.get(ledgerId); + IntIntPair value = ledgerMap == null ? null : ledgerMap.get(entryId); + return value == null ? PENDING_ACK_NOT_FOUND : value.leftInt(); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { + try { + writeLock.lock(); + TreeMap ledgerMap = pendingAcks.get(ledgerId); + IntIntPair value = ledgerMap == null ? null : ledgerMap.get(entryId); + if (value == null) { + return false; + } + ledgerMap.put(entryId, IntIntPair.of(value.leftInt() - ackedDelta, value.rightInt())); + return true; + } finally { + writeLock.unlock(); + } + } + + @Override + public int removeAndGetRemainingUnacked(long ledgerId, long entryId) { + try { + writeLock.lock(); + TreeMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return PENDING_ACK_NOT_FOUND; + } + IntIntPair value = ledgerMap.remove(entryId); + if (value == null) { + return PENDING_ACK_NOT_FOUND; + } + if (ledgerMap.isEmpty()) { + pendingAcks.remove(ledgerId); + } + return value.leftInt(); + } finally { + writeLock.unlock(); + } + } + + @Override + public long forEachAll() { + try { + readLock.lock(); + long total = 0; + for (Map.Entry> ledgerEntry : pendingAcks.entrySet()) { + TreeMap ledgerPendingAcks = ledgerEntry.getValue(); + for (IntIntPair value : ledgerPendingAcks.values()) { + total += value.leftInt() + value.rightInt(); + } + } + return total; + } finally { + readLock.unlock(); + } + } + + @Override + public long removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { + try { + writeLock.lock(); + long total = 0; + Iterator>> ledgerIterator = + pendingAcks.headMap(markDeleteLedgerId, true).entrySet().iterator(); + while (ledgerIterator.hasNext()) { + Map.Entry> ledgerEntry = ledgerIterator.next(); + long ledgerId = ledgerEntry.getKey(); + TreeMap ledgerMap = ledgerEntry.getValue(); + if (ledgerId < markDeleteLedgerId) { + for (IntIntPair value : ledgerMap.values()) { + total += value.leftInt(); + } + ledgerIterator.remove(); + } else { + Iterator> entryIterator = + ledgerMap.headMap(markDeleteEntryId, true).entrySet().iterator(); + while (entryIterator.hasNext()) { + total += entryIterator.next().getValue().leftInt(); + entryIterator.remove(); + } + if (ledgerMap.isEmpty()) { + ledgerIterator.remove(); + } + } + } + return total; + } finally { + writeLock.unlock(); + } + } + } +} diff --git a/microbench/src/main/java/org/apache/pulsar/broker/service/package-info.java b/microbench/src/main/java/org/apache/pulsar/broker/service/package-info.java new file mode 100644 index 0000000000000..5d21901f31bc8 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/service/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Benchmarks for broker service internals. + */ +package org.apache.pulsar.broker.service; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0f4708fa39fe7..f74cbdbea1108 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -83,6 +83,7 @@ public class Consumer { private static final Logger LOG = Logger.get(Consumer.class); + private static final int PENDING_ACK_NOT_FOUND = -1; private final Logger log; private final Subscription subscription; @@ -649,10 +650,10 @@ private CompletableFuture individualAck(CommandAck ack, Map addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); } } else if (!hasAckSet) { - IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet( + int removed = ackOwnerConsumer.removePendingAckAndGetRemainingUnacked( position.getLedgerId(), position.getEntryId()); - if (removed != null) { - addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt()); + if (removed != PENDING_ACK_NOT_FOUND) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -removed); updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); } } @@ -744,10 +745,10 @@ private void applyPendingAckCompletions(List pendingAckCom } } } else { - IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet( + int removed = ackOwnerConsumer.removePendingAckAndGetRemainingUnacked( position.getLedgerId(), position.getEntryId()); - if (removed != null) { - addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt()); + if (removed != PENDING_ACK_NOT_FOUND) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -removed); } } updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); @@ -841,16 +842,16 @@ private void checkAckValidationError(CommandAck ack, Position position) { */ private ObjectIntPair getAckOwnerConsumerAndBatchSize(long ledgerId, long entryId) { if (Subscription.isIndividualAckMode(subType)) { - IntIntPair pendingAck = getPendingAcks().get(ledgerId, entryId); - if (pendingAck != null) { - return ObjectIntPair.of(this, pendingAck.leftInt()); + int remainingUnacked = getPendingAcks().getRemainingUnacked(ledgerId, entryId); + if (remainingUnacked != PENDING_ACK_NOT_FOUND) { + return ObjectIntPair.of(this, remainingUnacked); } else { // If there are more consumers, this step will consume more CPU, and it should be optimized later. for (Consumer consumer : subscription.getConsumers()) { if (consumer != this) { - pendingAck = consumer.getPendingAcks().get(ledgerId, entryId); - if (pendingAck != null) { - return ObjectIntPair.of(consumer, pendingAck.leftInt()); + remainingUnacked = consumer.getPendingAcks().getRemainingUnacked(ledgerId, entryId); + if (remainingUnacked != PENDING_ACK_NOT_FOUND) { + return ObjectIntPair.of(consumer, remainingUnacked); } } } @@ -1207,6 +1208,20 @@ public IntIntPair removePendingAckAndGet(long ledgerId, long entryId) { return null; } + /** + * Atomically remove the pending ack entry and return its remaining unacked count. + * + *

No-op if {@code pendingAcks} is not initialized. + * + * @return the remaining unacked count, or {@code -1} if not found + */ + public int removePendingAckAndGetRemainingUnacked(long ledgerId, long entryId) { + if (pendingAcks != null) { + return pendingAcks.removeAndGetRemainingUnacked(ledgerId, entryId); + } + return PENDING_ACK_NOT_FOUND; + } + /** * Remove all pending acks up to the given mark-delete position and decrement the consumer's unacked message * counter by the remaining unacked count for each removed entry. @@ -1273,9 +1288,10 @@ public void redeliverUnacknowledgedMessages(List messageIds) { List pendingPositions = new ArrayList<>(); for (MessageIdData msg : messageIds) { Position position = PositionFactory.create(msg.getLedgerId(), msg.getEntryId()); - IntIntPair pendingAck = pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId()); - if (pendingAck != null) { - totalRedeliveryMessages += pendingAck.leftInt(); + int remainingUnacked = pendingAcks.removeAndGetRemainingUnacked( + position.getLedgerId(), position.getEntryId()); + if (remainingUnacked != PENDING_ACK_NOT_FOUND) { + totalRedeliveryMessages += remainingUnacked; pendingPositions.add(position); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 70f1a7dd247c8..9a482a979caf3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import org.apache.pulsar.common.util.collections.IntIntPair; +import org.apache.pulsar.common.util.collections.Long2LongOpenHashMap; /** * A thread-safe map to store pending acks in the consumer. @@ -97,11 +98,19 @@ public interface PendingAcksConsumer { } private final Consumer consumer; - private final TreeMap> pendingAcks; + private final TreeMap pendingAcks; private final Supplier pendingAcksAddHandlerSupplier; private final Supplier pendingAcksRemoveHandlerSupplier; private final Lock readLock; private final Lock writeLock; + private static final int PENDING_ACK_NOT_FOUND = -1; + /* + * Pending ack values are stored as a packed long to avoid allocating an IntIntPair per entry. + * The high 32 bits contain remainingUnacked and the low 32 bits contain stickyKeyHash. + * Long.MIN_VALUE is reserved for missing entries; remainingUnacked is a non-negative count in normal use, so + * the packed representation cannot collide with this sentinel. + */ + private static final long PACKED_PENDING_ACK_NOT_FOUND = Long.MIN_VALUE; private boolean closed = false; PendingAcksMap(Consumer consumer, Supplier pendingAcksAddHandlerSupplier, @@ -143,9 +152,9 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining && !pendingAcksAddHandler.handleAdding(consumer, ledgerId, entryId, stickyKeyHash)) { return false; } - TreeMap ledgerPendingAcks = - pendingAcks.computeIfAbsent(ledgerId, k -> new TreeMap<>()); - ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, stickyKeyHash)); + Long2LongOpenHashMap ledgerPendingAcks = + pendingAcks.computeIfAbsent(ledgerId, k -> new Long2LongOpenHashMap()); + ledgerPendingAcks.put(entryId, packPendingAckValue(remainingUnacked, stickyKeyHash)); return true; } finally { writeLock.unlock(); @@ -160,7 +169,7 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining public long size() { try { readLock.lock(); - return pendingAcks.values().stream().mapToInt(TreeMap::size).sum(); + return pendingAcks.values().stream().mapToInt(Long2LongOpenHashMap::size).sum(); } finally { readLock.unlock(); } @@ -184,16 +193,13 @@ public void forEach(PendingAcksConsumer processor) { private void processPendingAcks(PendingAcksConsumer processor) { // this code uses for loops intentionally, don't refactor to use forEach // iterate the outer map - for (Map.Entry> entry : pendingAcks.entrySet()) { + for (Map.Entry entry : pendingAcks.entrySet()) { long ledgerId = entry.getKey(); - TreeMap ledgerPendingAcks = entry.getValue(); + Long2LongOpenHashMap ledgerPendingAcks = entry.getValue(); // iterate the inner map - for (Map.Entry e : ledgerPendingAcks.entrySet()) { - long entryId = e.getKey(); - IntIntPair batchSizeAndStickyKeyHash = e.getValue(); - processor.accept(ledgerId, entryId, batchSizeAndStickyKeyHash.leftInt(), - batchSizeAndStickyKeyHash.rightInt()); - } + ledgerPendingAcks.forEach((entryId, packedValue) -> + processor.accept(ledgerId, entryId, unpackRemainingUnacked(packedValue), + unpackStickyKeyHash(packedValue))); } } @@ -254,7 +260,7 @@ private void internalForEachAndClear(PendingAcksConsumer processor, boolean clos public boolean contains(long ledgerId, long entryId) { try { readLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -274,11 +280,34 @@ public boolean contains(long ledgerId, long entryId) { public IntIntPair get(long ledgerId, long entryId) { try { readLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } - return ledgerMap.get(entryId); + long packedValue = getPackedPendingAckOrNotFound(ledgerMap, entryId); + return isPackedPendingAckNotFound(packedValue) ? null : unpackPendingAckValue(packedValue); + } finally { + readLock.unlock(); + } + } + + /** + * Get the remaining unacked count for the given ledger ID and entry ID. + * + * @param ledgerId the ledger ID + * @param entryId the entry ID + * @return the remaining unacked count, or -1 if not found + */ + public int getRemainingUnacked(long ledgerId, long entryId) { + try { + readLock.lock(); + Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return PENDING_ACK_NOT_FOUND; + } + long packedValue = getPackedPendingAckOrNotFound(ledgerMap, entryId); + return isPackedPendingAckNotFound(packedValue) + ? PENDING_ACK_NOT_FOUND : unpackRemainingUnacked(packedValue); } finally { readLock.unlock(); } @@ -296,12 +325,18 @@ public IntIntPair get(long ledgerId, long entryId) { public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyHash) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } - boolean removed = ledgerMap.remove(entryId, IntIntPair.of(batchSize, stickyKeyHash)); + long packedValue = getPackedPendingAckOrNotFound(ledgerMap, entryId); + if (isPackedPendingAckNotFound(packedValue)) { + return false; + } + boolean removed = unpackRemainingUnacked(packedValue) == batchSize + && unpackStickyKeyHash(packedValue) == stickyKeyHash; if (removed) { + ledgerMap.remove(entryId); handleRemovePendingAck(ledgerId, entryId, stickyKeyHash); } if (removed && ledgerMap.isEmpty()) { @@ -325,16 +360,16 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } - IntIntPair current = ledgerMap.get(entryId); - if (current == null) { + long packedValue = getPackedPendingAckOrNotFound(ledgerMap, entryId); + if (isPackedPendingAckNotFound(packedValue)) { return false; } - int newRemaining = current.leftInt() - ackedDelta; - ledgerMap.put(entryId, IntIntPair.of(newRemaining, current.rightInt())); + int newRemaining = unpackRemainingUnacked(packedValue) - ackedDelta; + ledgerMap.put(entryId, packPendingAckValue(newRemaining, unpackStickyKeyHash(packedValue))); return true; } finally { writeLock.unlock(); @@ -351,20 +386,20 @@ public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelt public boolean remove(long ledgerId, long entryId) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } - IntIntPair removedEntry = ledgerMap.remove(entryId); - boolean removed = removedEntry != null; - if (removed) { - int stickyKeyHash = removedEntry.rightInt(); - handleRemovePendingAck(ledgerId, entryId, stickyKeyHash); + long removedEntry = getPackedPendingAckOrNotFound(ledgerMap, entryId); + if (isPackedPendingAckNotFound(removedEntry)) { + return false; } - if (removed && ledgerMap.isEmpty()) { + ledgerMap.remove(entryId); + handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); + if (ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); } - return removed; + return true; } finally { writeLock.unlock(); } @@ -382,23 +417,55 @@ public boolean remove(long ledgerId, long entryId) { public IntIntPair removeAndGet(long ledgerId, long entryId) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } - IntIntPair removedEntry = ledgerMap.remove(entryId); - if (removedEntry != null) { - handleRemovePendingAck(ledgerId, entryId, removedEntry.rightInt()); + long removedEntry = getPackedPendingAckOrNotFound(ledgerMap, entryId); + if (isPackedPendingAckNotFound(removedEntry)) { + return null; + } + ledgerMap.remove(entryId); + handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); + if (ledgerMap.isEmpty()) { + pendingAcks.remove(ledgerId); + } + return unpackPendingAckValue(removedEntry); + } finally { + writeLock.unlock(); + } + } + + /** + * Atomically remove and return the remaining unacked count for the given ledger ID and entry ID. + * + * @param ledgerId the ledger ID + * @param entryId the entry ID + * @return the remaining unacked count, or -1 if not found + */ + public int removeAndGetRemainingUnacked(long ledgerId, long entryId) { + try { + writeLock.lock(); + Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return PENDING_ACK_NOT_FOUND; + } + long removedEntry = getPackedPendingAckOrNotFound(ledgerMap, entryId); + if (isPackedPendingAckNotFound(removedEntry)) { + return PENDING_ACK_NOT_FOUND; } - if (removedEntry != null && ledgerMap.isEmpty()) { + ledgerMap.remove(entryId); + handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); + if (ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); } - return removedEntry; + return unpackRemainingUnacked(removedEntry); } finally { writeLock.unlock(); } } + /** * Remove all pending acks up to the given ledger ID and entry ID, invoking a callback for each removed entry. * @@ -440,52 +507,67 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } else { readLock.lock(); } - Iterator>> ledgerMapIterator = - pendingAcks.headMap(markDeleteLedgerId + 1).entrySet().iterator(); + Iterator> ledgerMapIterator = + pendingAcks.headMap(markDeleteLedgerId, true).entrySet().iterator(); while (ledgerMapIterator.hasNext()) { - Map.Entry> entry = ledgerMapIterator.next(); + Map.Entry entry = ledgerMapIterator.next(); long ledgerId = entry.getKey(); - TreeMap ledgerMap = entry.getValue(); - TreeMap ledgerMapHead; - if (ledgerId == markDeleteLedgerId) { - ledgerMapHead = new TreeMap<>(ledgerMap.headMap(markDeleteEntryId + 1)); - } else { - ledgerMapHead = ledgerMap; - } - Iterator> entryMapIterator = - ledgerMapHead.entrySet().iterator(); - while (entryMapIterator.hasNext()) { - Map.Entry intIntPairEntry = entryMapIterator.next(); - long entryId = intIntPairEntry.getKey(); - if (!acquiredWriteLock) { + Long2LongOpenHashMap ledgerMap = entry.getValue(); + if (!acquiredWriteLock) { + if (ledgerId < markDeleteLedgerId && !ledgerMap.isEmpty()) { retryWithWriteLock = true; return; } - IntIntPair value = intIntPairEntry.getValue(); - int batchSize = value.leftInt(); - int stickyKeyHash = value.rightInt(); - if (pendingAcksRemoveHandler != null) { + if (ledgerId == markDeleteLedgerId && containsEntryUpTo(ledgerMap, markDeleteEntryId)) { + retryWithWriteLock = true; + return; + } + continue; + } + if (ledgerId < markDeleteLedgerId) { + if (!ledgerMap.isEmpty() && pendingAcksRemoveHandler != null) { if (!batchStarted) { pendingAcksRemoveHandler.startBatch(); batchStarted = true; } - pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, stickyKeyHash, closed); - } - if (removedEntryCallback != null) { - removedEntryCallback.accept(ledgerId, entryId, batchSize, stickyKeyHash); - } - entryMapIterator.remove(); - // also remove from the original map if we're iterating a copy - if (ledgerId == markDeleteLedgerId) { - ledgerMap.remove(entryId); - } - } - if (ledgerMap.isEmpty()) { - if (!acquiredWriteLock) { - retryWithWriteLock = true; - return; } + ledgerMap.forEach((entryId, packedValue) -> { + int stickyKeyHash = unpackStickyKeyHash(packedValue); + if (pendingAcksRemoveHandler != null) { + pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, + stickyKeyHash, closed); + } + if (removedEntryCallback != null) { + removedEntryCallback.accept(ledgerId, entryId, + unpackRemainingUnacked(packedValue), stickyKeyHash); + } + }); ledgerMapIterator.remove(); + } else { + boolean[] batchStartedHolder = new boolean[]{batchStarted}; + int removed = ledgerMap.removeIf((entryId, packedValue) -> { + if (entryId > markDeleteEntryId) { + return false; + } + int stickyKeyHash = unpackStickyKeyHash(packedValue); + if (pendingAcksRemoveHandler != null) { + if (!batchStartedHolder[0]) { + pendingAcksRemoveHandler.startBatch(); + batchStartedHolder[0] = true; + } + pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, + stickyKeyHash, closed); + } + if (removedEntryCallback != null) { + removedEntryCallback.accept(ledgerId, entryId, + unpackRemainingUnacked(packedValue), stickyKeyHash); + } + return true; + }); + batchStarted = batchStartedHolder[0]; + if (removed > 0 && ledgerMap.isEmpty()) { + ledgerMapIterator.remove(); + } } } } finally { @@ -503,6 +585,41 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } } + private static boolean containsEntryUpTo(Long2LongOpenHashMap ledgerMap, long maxEntryId) { + boolean[] found = new boolean[1]; + ledgerMap.forEach((entryId, ignoredValue) -> { + if (entryId <= maxEntryId) { + found[0] = true; + } + }); + return found[0]; + } + + // A packed value can legitimately be 0, so do not use Long2LongOpenHashMap.get() for lookups here. + private static long getPackedPendingAckOrNotFound(Long2LongOpenHashMap ledgerMap, long entryId) { + return ledgerMap.getOrDefault(entryId, PACKED_PENDING_ACK_NOT_FOUND); + } + + private static boolean isPackedPendingAckNotFound(long packedValue) { + return packedValue == PACKED_PENDING_ACK_NOT_FOUND; + } + + private static long packPendingAckValue(int remainingUnacked, int stickyKeyHash) { + return ((long) remainingUnacked << Integer.SIZE) | (stickyKeyHash & 0xFFFF_FFFFL); + } + + private static IntIntPair unpackPendingAckValue(long packedValue) { + return IntIntPair.of(unpackRemainingUnacked(packedValue), unpackStickyKeyHash(packedValue)); + } + + private static int unpackRemainingUnacked(long packedValue) { + return (int) (packedValue >> Integer.SIZE); + } + + private static int unpackStickyKeyHash(long packedValue) { + return (int) packedValue; + } + private void handleRemovePendingAck(long ledgerId, long entryId, int stickyKeyHash) { PendingAcksRemoveHandler pendingAcksRemoveHandler = pendingAcksRemoveHandlerSupplier.get(); if (pendingAcksRemoveHandler != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 02bf098561c1d..c22ba403b0f59 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -30,7 +30,9 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.pulsar.common.util.collections.IntIntPair; import org.testng.annotations.Test; @@ -81,7 +83,8 @@ public void forEach_ProcessesAllPendingAcks() { List processedEntries = new ArrayList<>(); pendingAcksMap.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> processedEntries.add(entryId)); - assertEquals(processedEntries, List.of(1L, 2L)); + assertEquals(processedEntries.size(), 2); + assertEquals(new HashSet<>(processedEntries), Set.of(1L, 2L)); } @Test @@ -94,7 +97,8 @@ public void forEachAndClose_ProcessesAndClearsAllPendingAcks() { List processedEntries = new ArrayList<>(); pendingAcksMap.forEachAndClose((ledgerId, entryId, batchSize, stickyKeyHash) -> processedEntries.add(entryId)); - assertEquals(processedEntries, List.of(1L, 2L)); + assertEquals(processedEntries.size(), 2); + assertEquals(new HashSet<>(processedEntries), Set.of(1L, 2L)); assertEquals(pendingAcksMap.size(), 0); } @@ -195,15 +199,14 @@ public void removeAllUpToWithCallback_InvokesCallbackForEachRemovedEntry() { pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 5, 124); pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 7, 125); - List callbackInvocations = new ArrayList<>(); + List callbackInvocations = new ArrayList<>(); pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize, stickyKeyHash) -> { - callbackInvocations.add(new int[]{(int) ledgerId, (int) entryId, batchSize, stickyKeyHash}); + callbackInvocations.add(ledgerId + ":" + entryId + ":" + batchSize + ":" + stickyKeyHash); }); assertEquals(callbackInvocations.size(), 2); - assertEquals(callbackInvocations.get(0), new int[]{1, 1, 3, 123}); - assertEquals(callbackInvocations.get(1), new int[]{1, 2, 5, 124}); + assertEquals(new HashSet<>(callbackInvocations), Set.of("1:1:3:123", "1:2:5:124")); assertFalse(pendingAcksMap.contains(1L, 1L)); assertFalse(pendingAcksMap.contains(1L, 2L)); assertTrue(pendingAcksMap.contains(2L, 1L)); @@ -230,7 +233,8 @@ public void forEachAndClear_ProcessesAndClearsAllPendingAcks() { List processedEntries = new ArrayList<>(); pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> processedEntries.add(entryId)); - assertEquals(processedEntries, List.of(1L, 2L)); + assertEquals(processedEntries.size(), 2); + assertEquals(new HashSet<>(processedEntries), Set.of(1L, 2L)); assertEquals(pendingAcksMap.size(), 0); } @@ -299,4 +303,177 @@ public void removeAndGet_InvokesRemoveHandler() { verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false); } -} \ No newline at end of file + + @Test + public void packedPendingAckFields_RoundTripThroughPublicAccessors() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + int[][] values = new int[][] { + {0, 0}, + {0, -1}, + {1, Integer.MIN_VALUE}, + {1, Integer.MAX_VALUE}, + {Integer.MAX_VALUE, -123456789}, + {123456789, Integer.MIN_VALUE}, + {42, 0x80000001}, + }; + + Set expectedEntries = new HashSet<>(); + for (int i = 0; i < values.length; i++) { + long entryId = i + 1L; + int remainingUnacked = values[i][0]; + int stickyKeyHash = values[i][1]; + pendingAcksMap.addPendingAckIfAllowed(1L, entryId, remainingUnacked, stickyKeyHash); + expectedEntries.add(packedFieldsKey(entryId, remainingUnacked, stickyKeyHash)); + } + + List iteratedEntries = new ArrayList<>(); + pendingAcksMap.forEach((ledgerId, entryId, remainingUnacked, stickyKeyHash) -> + iteratedEntries.add(packedFieldsKey(entryId, remainingUnacked, stickyKeyHash))); + assertEquals(new HashSet<>(iteratedEntries), expectedEntries); + + for (int i = 0; i < values.length; i++) { + long entryId = i + 1L; + int remainingUnacked = values[i][0]; + int stickyKeyHash = values[i][1]; + + assertPackedFields(pendingAcksMap.get(1L, entryId), remainingUnacked, stickyKeyHash); + assertEquals(pendingAcksMap.getRemainingUnacked(1L, entryId), remainingUnacked); + + IntIntPair removed = pendingAcksMap.removeAndGet(1L, entryId); + assertPackedFields(removed, remainingUnacked, stickyKeyHash); + assertFalse(pendingAcksMap.contains(1L, entryId)); + } + } + + @Test + public void removeAllUpTo_PreservesPackedFieldsInCallback() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + int[][] values = new int[][] { + {7, 0}, + {8, -1}, + {9, Integer.MIN_VALUE}, + {10, Integer.MAX_VALUE}, + }; + Set expectedRemovedEntries = new HashSet<>(); + for (int i = 0; i < values.length; i++) { + long entryId = i + 1L; + int remainingUnacked = values[i][0]; + int stickyKeyHash = values[i][1]; + pendingAcksMap.addPendingAckIfAllowed(1L, entryId, remainingUnacked, stickyKeyHash); + expectedRemovedEntries.add(packedFieldsKey(entryId, remainingUnacked, stickyKeyHash)); + } + pendingAcksMap.addPendingAckIfAllowed(1L, values.length + 1L, 11, -123); + + List removedEntries = new ArrayList<>(); + pendingAcksMap.removeAllUpTo(1L, values.length, (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> + removedEntries.add(packedFieldsKey(entryId, remainingUnacked, stickyKeyHash))); + + assertEquals(new HashSet<>(removedEntries), expectedRemovedEntries); + assertTrue(pendingAcksMap.contains(1L, values.length + 1L)); + } + + @Test + public void updateRemainingUnacked_PreservesPackedStickyKeyHash() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + int stickyKeyHash = Integer.MIN_VALUE; + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 100, stickyKeyHash); + + assertTrue(pendingAcksMap.updateRemainingUnacked(1L, 1L, 7)); + + assertPackedFields(pendingAcksMap.get(1L, 1L), 93, stickyKeyHash); + assertTrue(pendingAcksMap.remove(1L, 1L, 93, stickyKeyHash)); + assertFalse(pendingAcksMap.contains(1L, 1L)); + } + + @Test + public void getRemainingUnacked_ReturnsMinusOneWhenNotFound() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + + assertEquals(pendingAcksMap.getRemainingUnacked(1L, 1L), -1); + + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 0, 123); + + assertEquals(pendingAcksMap.getRemainingUnacked(1L, 1L), 0); + } + + @Test + public void removeAndGetRemainingUnacked_InvokesRemoveHandler() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap.PendingAcksRemoveHandler removeHandler = mock(PendingAcksMap.PendingAcksRemoveHandler.class); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> removeHandler); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123); + + int removed = pendingAcksMap.removeAndGetRemainingUnacked(1L, 1L); + + assertEquals(removed, 5); + assertEquals(pendingAcksMap.removeAndGetRemainingUnacked(1L, 1L), -1); + verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false); + } + + @Test + public void removeAllUpTo_RemovesBoundaryEntriesWithUnorderedInnerMap() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(2L, 11L, 1, 111); + pendingAcksMap.addPendingAckIfAllowed(2L, 3L, 1, 103); + pendingAcksMap.addPendingAckIfAllowed(2L, 7L, 1, 107); + pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 101); + pendingAcksMap.addPendingAckIfAllowed(2L, 9L, 1, 109); + pendingAcksMap.addPendingAckIfAllowed(3L, 1L, 1, 201); + + pendingAcksMap.removeAllUpTo(2L, 7L, (ledgerId, entryId, batchSize, stickyKeyHash) -> { + }); + + assertFalse(pendingAcksMap.contains(2L, 1L)); + assertFalse(pendingAcksMap.contains(2L, 3L)); + assertFalse(pendingAcksMap.contains(2L, 7L)); + assertTrue(pendingAcksMap.contains(2L, 9L)); + assertTrue(pendingAcksMap.contains(2L, 11L)); + assertTrue(pendingAcksMap.contains(3L, 1L)); + } + + @Test + public void size_RemainsCorrectAcrossCommonOperations() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 2, 124); + assertEquals(pendingAcksMap.size(), 1); + + pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 3, 125); + assertEquals(pendingAcksMap.size(), 2); + + assertTrue(pendingAcksMap.updateRemainingUnacked(1L, 1L, 1)); + assertEquals(pendingAcksMap.size(), 2); + + assertFalse(pendingAcksMap.remove(1L, 1L, 99, 124)); + assertEquals(pendingAcksMap.size(), 2); + + assertTrue(pendingAcksMap.remove(1L, 1L)); + assertEquals(pendingAcksMap.size(), 1); + + pendingAcksMap.removeAllUpTo(2L, 1L, (ledgerId, entryId, batchSize, stickyKeyHash) -> { + }); + assertEquals(pendingAcksMap.size(), 0); + + pendingAcksMap.addPendingAckIfAllowed(3L, 1L, 1, 126); + pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> { + }); + assertEquals(pendingAcksMap.size(), 0); + } + + private static void assertPackedFields(IntIntPair actual, int remainingUnacked, int stickyKeyHash) { + assertTrue(actual != null); + assertEquals(actual.leftInt(), remainingUnacked); + assertEquals(actual.rightInt(), stickyKeyHash); + } + + private static String packedFieldsKey(long entryId, int remainingUnacked, int stickyKeyHash) { + return entryId + ":" + remainingUnacked + ":" + stickyKeyHash; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2LongMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2LongMap.java new file mode 100644 index 0000000000000..6806fac7f305f --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2LongMap.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import java.util.function.LongUnaryOperator; + +/** + * A map with primitive {@code long} keys and primitive {@code long} values. + * + *

The default return value for missing keys is {@code 0}. Use {@link #getOrDefault(long, long)} + * or {@link #containsKey(long)} when {@code 0} is a valid mapped value. + */ +public interface Long2LongMap { + + @FunctionalInterface + interface EntryConsumer { + void accept(long key, long value); + } + + @FunctionalInterface + interface EntryPredicate { + boolean test(long key, long value); + } + + /** + * Returns the value for the given key, or {@code 0} if not present. + * + * @param key the key + * @return the mapped value, or {@code 0} + */ + long get(long key); + + /** + * Associates the given value with the given key. + * + * @param key the key + * @param value the value + * @return the previous value, or {@code 0} if there was no mapping + */ + long put(long key, long value); + + /** + * Removes the mapping for the given key. + * + * @param key the key + * @return the previous value, or {@code 0} if there was no mapping + */ + long remove(long key); + + /** + * Returns the value for the given key, or the specified default if not present. + * + * @param key the key + * @param defaultValue the default value to return if the key is absent + * @return the mapped value, or {@code defaultValue} + */ + long getOrDefault(long key, long defaultValue); + + /** + * If the key is not already present, computes its value using the given function and inserts it. + * + * @param key the key + * @param mappingFunction the function to compute a value + * @return the current (existing or computed) value + */ + long computeIfAbsent(long key, LongUnaryOperator mappingFunction); + + /** + * Returns {@code true} if this map contains the given key. + * + * @param key the key + * @return {@code true} if this map contains the key + */ + boolean containsKey(long key); + + /** + * Returns {@code true} if this map contains no entries. + */ + boolean isEmpty(); + + /** + * Returns the number of entries in this map. + */ + int size(); + + /** + * Removes all entries from this map. + */ + void clear(); + + /** + * Iterates over all entries, calling the consumer with primitive long keys and values. + * + * @param consumer the consumer to call for each entry + */ + void forEach(EntryConsumer consumer); + + /** + * Removes each entry that matches the predicate. + * + * @param predicate the predicate to test entries + * @return the number of removed entries + */ + int removeIf(EntryPredicate predicate); +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMap.java new file mode 100644 index 0000000000000..ffe8649ef5d25 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMap.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import java.util.Arrays; +import java.util.function.LongUnaryOperator; + +/** + * Open-addressing hash map with primitive long keys and primitive long values. + * Uses linear probing and fibonacci hashing. + * Returns 0 for missing keys; use getOrDefault or containsKey when 0 is a valid mapped value. + * Not thread-safe. + */ +public class Long2LongOpenHashMap implements Long2LongMap { + + private static final float LOAD_FACTOR = 0.75f; + private static final int MIN_CAPACITY = 16; + + private long[] keys; + private long[] values; + private boolean[] used; + private int size; + private int capacity; + private int threshold; + + public Long2LongOpenHashMap() { + this(MIN_CAPACITY); + } + + public Long2LongOpenHashMap(int expectedItems) { + int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); + keys = new long[cap]; + values = new long[cap]; + used = new boolean[cap]; + capacity = cap; + threshold = (int) (cap * LOAD_FACTOR); + } + + @Override + public long get(long key) { + int idx = indexOf(key); + return idx >= 0 ? values[idx] : 0; + } + + @Override + public long put(long key, long value) { + int idx = indexOf(key); + if (idx >= 0) { + long old = values[idx]; + values[idx] = value; + return old; + } + if (size >= threshold) { + rehash(capacity * 2); + } + insertNew(key, value); + return 0; + } + + @Override + public long remove(long key) { + int idx = indexOf(key); + if (idx < 0) { + return 0; + } + long old = values[idx]; + removeAt(idx); + return old; + } + + @Override + public long getOrDefault(long key, long defaultValue) { + int idx = indexOf(key); + return idx >= 0 ? values[idx] : defaultValue; + } + + @Override + public long computeIfAbsent(long key, LongUnaryOperator mappingFunction) { + int idx = indexOf(key); + if (idx >= 0) { + return values[idx]; + } + long value = mappingFunction.applyAsLong(key); + if (size >= threshold) { + rehash(capacity * 2); + } + insertNew(key, value); + return value; + } + + @Override + public boolean containsKey(long key) { + return indexOf(key) >= 0; + } + + @Override + public boolean isEmpty() { + return size == 0; + } + + @Override + public int size() { + return size; + } + + @Override + public void clear() { + if (size > 0) { + Arrays.fill(used, false); + size = 0; + } + } + + @Override + public void forEach(EntryConsumer consumer) { + for (int i = 0; i < capacity; i++) { + if (used[i]) { + consumer.accept(keys[i], values[i]); + } + } + } + + @Override + public int removeIf(EntryPredicate predicate) { + int removed = 0; + for (int i = 0; i < capacity;) { + if (!used[i]) { + i++; + continue; + } + if (predicate.test(keys[i], values[i])) { + removeAt(i); + removed++; + } else { + i++; + } + } + return removed; + } + + private int indexOf(long key) { + int mask = capacity - 1; + int idx = Long2ObjectOpenHashMap.hash(key) & mask; + while (true) { + if (!used[idx]) { + return -1; + } + if (keys[idx] == key) { + return idx; + } + idx = (idx + 1) & mask; + } + } + + private void insertNew(long key, long value) { + int mask = capacity - 1; + int idx = Long2ObjectOpenHashMap.hash(key) & mask; + while (used[idx]) { + idx = (idx + 1) & mask; + } + keys[idx] = key; + values[idx] = value; + used[idx] = true; + size++; + } + + private void removeAt(int idx) { + int mask = capacity - 1; + size--; + int next = (idx + 1) & mask; + while (used[next]) { + int naturalSlot = Long2ObjectOpenHashMap.hash(keys[next]) & mask; + if ((next > idx && (naturalSlot <= idx || naturalSlot > next)) + || (next < idx && (naturalSlot <= idx && naturalSlot > next))) { + keys[idx] = keys[next]; + values[idx] = values[next]; + idx = next; + } + next = (next + 1) & mask; + } + used[idx] = false; + } + + private void rehash(int newCapacity) { + long[] oldKeys = keys; + long[] oldValues = values; + boolean[] oldUsed = used; + int oldCapacity = capacity; + + capacity = newCapacity; + keys = new long[newCapacity]; + values = new long[newCapacity]; + used = new boolean[newCapacity]; + threshold = (int) (newCapacity * LOAD_FACTOR); + size = 0; + + for (int i = 0; i < oldCapacity; i++) { + if (oldUsed[i]) { + insertNew(oldKeys[i], oldValues[i]); + } + } + } + + private static int tableSizeFor(int cap) { + int n = cap - 1; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMapTest.java new file mode 100644 index 0000000000000..a94840ac7e11d --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMapTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import org.testng.annotations.Test; + +public class Long2LongOpenHashMapTest { + + @Test + public void testEmpty() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + assertTrue(map.isEmpty()); + assertEquals(map.size(), 0); + assertEquals(map.get(0), 0L); + assertFalse(map.containsKey(0)); + } + + @Test + public void testPutGet() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + assertEquals(map.put(1, 10), 0L); + assertEquals(map.put(2, Long.MAX_VALUE), 0L); + assertFalse(map.isEmpty()); + assertEquals(map.size(), 2); + assertTrue(map.containsKey(1)); + assertEquals(map.get(1), 10L); + assertEquals(map.get(2), Long.MAX_VALUE); + assertEquals(map.get(3), 0L); + } + + @Test + public void testPutReplace() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + map.put(1, 10); + assertEquals(map.put(1, 100), 10L); + assertEquals(map.get(1), 100L); + assertEquals(map.size(), 1); + } + + @Test + public void testRemove() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + map.put(1, 10); + map.put(2, 20); + assertEquals(map.remove(1), 10L); + assertFalse(map.containsKey(1)); + assertEquals(map.get(1), 0L); + assertEquals(map.remove(99), 0L); + assertEquals(map.size(), 1); + } + + @Test + public void testGetOrDefault() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + map.put(1, 10); + assertEquals(map.getOrDefault(1, -1), 10L); + assertEquals(map.getOrDefault(2, -1), -1L); + } + + @Test + public void testZeroValueCanBeDistinguishedFromMissingKey() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + map.put(1, 0); + + assertTrue(map.containsKey(1)); + assertEquals(map.get(1), 0L); + assertEquals(map.getOrDefault(1, -1), 0L); + assertFalse(map.containsKey(2)); + assertEquals(map.get(2), 0L); + assertEquals(map.getOrDefault(2, -1), -1L); + } + + @Test + public void testComputeIfAbsent() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + assertEquals(map.computeIfAbsent(1, k -> 10), 10L); + assertEquals(map.computeIfAbsent(1, k -> 99), 10L); + } + + @Test + public void testClear() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + map.put(1, 10); + map.put(2, 20); + map.clear(); + assertTrue(map.isEmpty()); + assertEquals(map.size(), 0); + assertEquals(map.get(1), 0L); + } + + @Test + public void testForEach() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + map.put(1, 10); + map.put(2, 20); + Map values = new HashMap<>(); + + map.forEach(values::put); + + assertEquals(values, Map.of(1L, 10L, 2L, 20L)); + } + + @Test + public void testRemoveIf() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(); + for (int i = 0; i < 100; i++) { + map.put(i, i * 10L); + } + + int removed = map.removeIf((key, value) -> key % 2 == 0); + + assertEquals(removed, 50); + assertEquals(map.size(), 50); + for (int i = 0; i < 100; i++) { + assertEquals(map.containsKey(i), i % 2 != 0); + } + } + + @Test + public void testRehash() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(4); + for (int i = 0; i < 100; i++) { + map.put(i, i * 10L); + } + assertEquals(map.size(), 100); + for (int i = 0; i < 100; i++) { + assertEquals(map.get(i), i * 10L); + } + } + + @Test + public void testRandomOperationsAgainstHashMap() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(4); + Map expected = new HashMap<>(); + Random random = new Random(0); + + for (int i = 0; i < 10_000; i++) { + long key = random.nextInt(512) - 256L; + switch (random.nextInt(5)) { + case 0 -> { + long value = random.nextLong(); + Long previous = expected.put(key, value); + assertEquals(map.put(key, value), previous == null ? 0L : previous.longValue()); + } + case 1 -> { + Long previous = expected.remove(key); + assertEquals(map.remove(key), previous == null ? 0L : previous.longValue()); + } + case 2 -> assertEquals(map.get(key), expected.getOrDefault(key, 0L).longValue()); + case 3 -> assertEquals(map.containsKey(key), expected.containsKey(key)); + case 4 -> { + long value = key * 37 + i; + Long previous = expected.get(key); + assertEquals(map.computeIfAbsent(key, ignored -> value), + previous == null ? value : previous.longValue()); + expected.putIfAbsent(key, value); + } + default -> throw new IllegalStateException("Unexpected random operation"); + } + + if (i % 257 == 0) { + int removed = map.removeIf((entryKey, value) -> (entryKey & 3) == 0); + int expectedRemoved = 0; + Iterator> iterator = expected.entrySet().iterator(); + while (iterator.hasNext()) { + if ((iterator.next().getKey() & 3) == 0) { + iterator.remove(); + expectedRemoved++; + } + } + assertEquals(removed, expectedRemoved); + } + + assertEquals(map.size(), expected.size()); + for (Map.Entry entry : expected.entrySet()) { + assertTrue(map.containsKey(entry.getKey())); + assertEquals(map.get(entry.getKey()), entry.getValue().longValue()); + } + } + } +} From 257f2ba7ca918a79b6b4aa4b6bc8c310a06e0bef Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sun, 14 Jun 2026 00:10:05 +0800 Subject: [PATCH 2/6] [improve][broker] Add realistic pending ack benchmark datasets --- .../service/PendingAcksMapBenchmark.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java index 9b9444c9abe43..1ce6e5af146de 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java @@ -107,7 +107,9 @@ public static class MapState { @Param({"oldProduction", "production"}) private String implementation; - @Param({"64kEntries1kLedgers", "1mEntries16kLedgers"}) + @Param({"receiverQueue1kEntries1Ledger", "batchedReceiverQueue100Entries1Ledger", + "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries5Ledgers", + "64kEntries1kLedgers", "1mEntries16kLedgers"}) private String dataset; private PendingAckStore store; @@ -133,7 +135,9 @@ public static class RangeState { @Param({"oldProduction", "production"}) private String implementation; - @Param({"64kEntries1kLedgers", "1mEntries16kLedgers"}) + @Param({"receiverQueue1kEntries1Ledger", "batchedReceiverQueue100Entries1Ledger", + "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries5Ledgers", + "64kEntries1kLedgers", "1mEntries16kLedgers"}) private String dataset; private PendingAckStore store; @@ -155,7 +159,9 @@ public static class PopulateState { @Param({"oldProduction", "production"}) private String implementation; - @Param({"64kEntries1kLedgers", "1mEntries16kLedgers"}) + @Param({"receiverQueue1kEntries1Ledger", "batchedReceiverQueue100Entries1Ledger", + "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries5Ledgers", + "64kEntries1kLedgers", "1mEntries16kLedgers"}) private String dataset; private int entries; @@ -175,12 +181,17 @@ public static class CursorState { private int next(int entries) { int current = index; - index = current + 1; - return current & (entries - 1); + int next = current + 1; + index = next == entries ? 0 : next; + return current; } } private enum Dataset { + RECEIVER_QUEUE_1K_ENTRIES_1_LEDGER("receiverQueue1kEntries1Ledger", 1_000, 1), + BATCHED_RECEIVER_QUEUE_100_ENTRIES_1_LEDGER("batchedReceiverQueue100Entries1Ledger", 100, 1), + DEFAULT_UNACKED_50K_ENTRIES_1_LEDGER("defaultUnacked50kEntries1Ledger", 50_000, 1), + DEFAULT_UNACKED_50K_ENTRIES_5_LEDGERS("defaultUnacked50kEntries5Ledgers", 50_000, 5), ENTRIES_64K_LEDGERS_1K("64kEntries1kLedgers", 65_536, 1_024), ENTRIES_1M_LEDGERS_16K("1mEntries16kLedgers", 1_048_576, 16_384); From b40c355b3977c9203a6a66723bf072daf34973eb Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sun, 14 Jun 2026 00:37:57 +0800 Subject: [PATCH 3/6] [improve][broker] Refine pending ack benchmark coverage --- .../service/PendingAcksMapBenchmark.java | 58 ++++++++++++------- .../broker/service/PendingAcksMapTest.java | 29 ++++++++++ 2 files changed, 67 insertions(+), 20 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java index 1ce6e5af146de..c9d0c60df03bb 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java @@ -98,7 +98,7 @@ public long removeAllUpTo(RangeState state) { @Benchmark public void populate(PopulateState state, Blackhole blackhole) { PendingAckStore store = createStore(state.implementation); - populate(store, state.entries, state.ledgers, null, null); + populate(store, state.parsedDataset, null, null); blackhole.consume(store); } @@ -108,7 +108,9 @@ public static class MapState { private String implementation; @Param({"receiverQueue1kEntries1Ledger", "batchedReceiverQueue100Entries1Ledger", - "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries5Ledgers", + "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries2Ledgers", + "defaultUnacked50kEntries5Ledgers", "defaultUnacked50kEntries10Ledgers", + "defaultUnacked50kEntries20Ledgers", "residual1kEntries5Ledgers", "residual1kEntries100Ledgers", "64kEntries1kLedgers", "1mEntries16kLedgers"}) private String dataset; @@ -126,7 +128,7 @@ public void setup() { ledgerIds = new long[entries]; entryIds = new long[entries]; store = createStore(implementation); - populate(store, entries, ledgers, ledgerIds, entryIds); + populate(store, parsedDataset, ledgerIds, entryIds); } } @@ -136,7 +138,9 @@ public static class RangeState { private String implementation; @Param({"receiverQueue1kEntries1Ledger", "batchedReceiverQueue100Entries1Ledger", - "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries5Ledgers", + "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries2Ledgers", + "defaultUnacked50kEntries5Ledgers", "defaultUnacked50kEntries10Ledgers", + "defaultUnacked50kEntries20Ledgers", "residual1kEntries5Ledgers", "residual1kEntries100Ledgers", "64kEntries1kLedgers", "1mEntries16kLedgers"}) private String dataset; @@ -148,9 +152,9 @@ public static class RangeState { public void setup() { Dataset parsedDataset = Dataset.from(dataset); store = createStore(implementation); - populate(store, parsedDataset.entries, parsedDataset.ledgers, null, null); + populate(store, parsedDataset, null, null); markDeleteLedgerId = parsedDataset.ledgers / 2L; - markDeleteEntryId = parsedDataset.entries / parsedDataset.ledgers / 2L; + markDeleteEntryId = parsedDataset.entriesInLedger(markDeleteLedgerId) / 2L; } } @@ -160,18 +164,17 @@ public static class PopulateState { private String implementation; @Param({"receiverQueue1kEntries1Ledger", "batchedReceiverQueue100Entries1Ledger", - "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries5Ledgers", + "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries2Ledgers", + "defaultUnacked50kEntries5Ledgers", "defaultUnacked50kEntries10Ledgers", + "defaultUnacked50kEntries20Ledgers", "residual1kEntries5Ledgers", "residual1kEntries100Ledgers", "64kEntries1kLedgers", "1mEntries16kLedgers"}) private String dataset; - private int entries; - private int ledgers; + private Dataset parsedDataset; @Setup(Level.Trial) public void setup() { - Dataset parsedDataset = Dataset.from(dataset); - entries = parsedDataset.entries; - ledgers = parsedDataset.ledgers; + parsedDataset = Dataset.from(dataset); } } @@ -191,7 +194,12 @@ private enum Dataset { RECEIVER_QUEUE_1K_ENTRIES_1_LEDGER("receiverQueue1kEntries1Ledger", 1_000, 1), BATCHED_RECEIVER_QUEUE_100_ENTRIES_1_LEDGER("batchedReceiverQueue100Entries1Ledger", 100, 1), DEFAULT_UNACKED_50K_ENTRIES_1_LEDGER("defaultUnacked50kEntries1Ledger", 50_000, 1), + DEFAULT_UNACKED_50K_ENTRIES_2_LEDGERS("defaultUnacked50kEntries2Ledgers", 50_000, 2), DEFAULT_UNACKED_50K_ENTRIES_5_LEDGERS("defaultUnacked50kEntries5Ledgers", 50_000, 5), + DEFAULT_UNACKED_50K_ENTRIES_10_LEDGERS("defaultUnacked50kEntries10Ledgers", 50_000, 10), + DEFAULT_UNACKED_50K_ENTRIES_20_LEDGERS("defaultUnacked50kEntries20Ledgers", 50_000, 20), + RESIDUAL_1K_ENTRIES_5_LEDGERS("residual1kEntries5Ledgers", 1_000, 5), + RESIDUAL_1K_ENTRIES_100_LEDGERS("residual1kEntries100Ledgers", 1_000, 100), ENTRIES_64K_LEDGERS_1K("64kEntries1kLedgers", 65_536, 1_024), ENTRIES_1M_LEDGERS_16K("1mEntries16kLedgers", 1_048_576, 16_384); @@ -213,6 +221,12 @@ private static Dataset from(String name) { } throw new IllegalArgumentException("Unknown dataset: " + name); } + + private int entriesInLedger(long ledgerId) { + int baseEntries = entries / ledgers; + int extraEntries = entries % ledgers; + return baseEntries + (ledgerId < extraEntries ? 1 : 0); + } } private interface PendingAckStore { @@ -239,16 +253,20 @@ private static PendingAckStore createStore(String implementation) { }; } - private static void populate(PendingAckStore store, int entries, int ledgers, + private static void populate(PendingAckStore store, Dataset dataset, long[] ledgerIds, long[] entryIds) { - for (int i = 0; i < entries; i++) { - long ledgerId = i % ledgers; - long entryId = i / ledgers; - if (ledgerIds != null) { - ledgerIds[i] = ledgerId; - entryIds[i] = entryId; + int index = 0; + // Managed ledger entries are appended sequentially inside one ledger before rollover. + for (long ledgerId = 0; ledgerId < dataset.ledgers; ledgerId++) { + int entriesInLedger = dataset.entriesInLedger(ledgerId); + for (long entryId = 0; entryId < entriesInLedger; entryId++) { + if (ledgerIds != null) { + ledgerIds[index] = ledgerId; + entryIds[index] = entryId; + } + store.addPendingAckIfAllowed(ledgerId, entryId, remaining(index), stickyKeyHash(index)); + index++; } - store.addPendingAckIfAllowed(ledgerId, entryId, remaining(i), stickyKeyHash(i)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index c22ba403b0f59..3f99977f0ceb8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -436,6 +436,35 @@ public void removeAllUpTo_RemovesBoundaryEntriesWithUnorderedInnerMap() { assertTrue(pendingAcksMap.contains(3L, 1L)); } + @Test + public void removeAllUpTo_RemovesWholeLedgersAndUnorderedBoundaryEntries() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 10L, 1, 110); + pendingAcksMap.addPendingAckIfAllowed(2L, 5L, 1, 205); + pendingAcksMap.addPendingAckIfAllowed(2L, 15L, 1, 215); + pendingAcksMap.addPendingAckIfAllowed(3L, 20L, 1, 320); + pendingAcksMap.addPendingAckIfAllowed(3L, 2L, 1, 302); + pendingAcksMap.addPendingAckIfAllowed(3L, 8L, 1, 308); + pendingAcksMap.addPendingAckIfAllowed(3L, 11L, 1, 311); + pendingAcksMap.addPendingAckIfAllowed(4L, 1L, 1, 401); + + Set removedEntries = new HashSet<>(); + pendingAcksMap.removeAllUpTo(3L, 8L, (ledgerId, entryId, batchSize, stickyKeyHash) -> + removedEntries.add(ledgerId + ":" + entryId + ":" + stickyKeyHash)); + + assertEquals(removedEntries, Set.of("1:10:110", "2:5:205", "2:15:215", "3:2:302", "3:8:308")); + assertFalse(pendingAcksMap.contains(1L, 10L)); + assertFalse(pendingAcksMap.contains(2L, 5L)); + assertFalse(pendingAcksMap.contains(2L, 15L)); + assertFalse(pendingAcksMap.contains(3L, 2L)); + assertFalse(pendingAcksMap.contains(3L, 8L)); + assertTrue(pendingAcksMap.contains(3L, 11L)); + assertTrue(pendingAcksMap.contains(3L, 20L)); + assertTrue(pendingAcksMap.contains(4L, 1L)); + assertEquals(pendingAcksMap.size(), 3); + } + @Test public void size_RemainsCorrectAcrossCommonOperations() { Consumer consumer = createMockConsumer("consumer1"); From 25064db46828cfcc833aa198345c31e09b37c06d Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sun, 14 Jun 2026 09:34:06 +0800 Subject: [PATCH 4/6] [improve][broker] Avoid mutable array holders in pending ack code --- .../service/PendingAcksMapBenchmark.java | 19 ++++++++++------- .../pulsar/broker/service/PendingAcksMap.java | 21 ++++++------------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java index c9d0c60df03bb..7de1a6458e117 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java @@ -280,6 +280,11 @@ private static int stickyKeyHash(int index) { private static final class ProductionPendingAckStore implements PendingAckStore { private final PendingAcksMap pendingAcks = new PendingAcksMap(null, () -> null, () -> null); + private long total; + private final PendingAcksMap.PendingAcksConsumer sumAllConsumer = + (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> total += remainingUnacked + stickyKeyHash; + private final PendingAcksMap.PendingAcksConsumer sumRemainingConsumer = + (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> total += remainingUnacked; @Override public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remainingUnacked, @@ -309,18 +314,16 @@ public int removeAndGetRemainingUnacked(long ledgerId, long entryId) { @Override public long forEachAll() { - long[] total = new long[1]; - pendingAcks.forEach((ledgerId, entryId, remainingUnacked, stickyKeyHash) -> - total[0] += remainingUnacked + stickyKeyHash); - return total[0]; + total = 0; + pendingAcks.forEach(sumAllConsumer); + return total; } @Override public long removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { - long[] total = new long[1]; - pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId, - (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> total[0] += remainingUnacked); - return total[0]; + total = 0; + pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId, sumRemainingConsumer); + return total; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 9a482a979caf3..64fec113715ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.pulsar.common.util.collections.IntIntPair; import org.apache.pulsar.common.util.collections.Long2LongOpenHashMap; @@ -518,7 +519,7 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry retryWithWriteLock = true; return; } - if (ledgerId == markDeleteLedgerId && containsEntryUpTo(ledgerMap, markDeleteEntryId)) { + if (ledgerId == markDeleteLedgerId && !ledgerMap.isEmpty()) { retryWithWriteLock = true; return; } @@ -544,16 +545,16 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry }); ledgerMapIterator.remove(); } else { - boolean[] batchStartedHolder = new boolean[]{batchStarted}; + MutableBoolean batchStartedHolder = new MutableBoolean(batchStarted); int removed = ledgerMap.removeIf((entryId, packedValue) -> { if (entryId > markDeleteEntryId) { return false; } int stickyKeyHash = unpackStickyKeyHash(packedValue); if (pendingAcksRemoveHandler != null) { - if (!batchStartedHolder[0]) { + if (!batchStartedHolder.booleanValue()) { pendingAcksRemoveHandler.startBatch(); - batchStartedHolder[0] = true; + batchStartedHolder.setTrue(); } pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, stickyKeyHash, closed); @@ -564,7 +565,7 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } return true; }); - batchStarted = batchStartedHolder[0]; + batchStarted = batchStartedHolder.booleanValue(); if (removed > 0 && ledgerMap.isEmpty()) { ledgerMapIterator.remove(); } @@ -585,16 +586,6 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } } - private static boolean containsEntryUpTo(Long2LongOpenHashMap ledgerMap, long maxEntryId) { - boolean[] found = new boolean[1]; - ledgerMap.forEach((entryId, ignoredValue) -> { - if (entryId <= maxEntryId) { - found[0] = true; - } - }); - return found[0]; - } - // A packed value can legitimately be 0, so do not use Long2LongOpenHashMap.get() for lookups here. private static long getPackedPendingAckOrNotFound(Long2LongOpenHashMap ledgerMap, long entryId) { return ledgerMap.getOrDefault(entryId, PACKED_PENDING_ACK_NOT_FOUND); From 706d2ab157c6ae3e94b5d26ee37665bbbff47656 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sun, 14 Jun 2026 15:32:10 +0800 Subject: [PATCH 5/6] [improve][broker] Speed up PendingAcksMap prefix cleanup --- .../service/PendingAcksMapBenchmark.java | 156 ++++++++++++- .../pulsar/broker/service/PendingAcksMap.java | 217 +++++++++++++----- .../broker/service/PendingAcksMapTest.java | 59 +++++ 3 files changed, 371 insertions(+), 61 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java index 7de1a6458e117..8b8817d86521a 100644 --- a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java +++ b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapBenchmark.java @@ -95,6 +95,11 @@ public long removeAllUpTo(RangeState state) { return state.store.removeAllUpTo(state.markDeleteLedgerId, state.markDeleteEntryId); } + @Benchmark + public long removeAllUpToSameLedger(CleanupState state) { + return state.store.removeAllUpTo(state.markDeleteLedgerId, state.markDeleteEntryId); + } + @Benchmark public void populate(PopulateState state, Blackhole blackhole) { PendingAckStore store = createStore(state.implementation); @@ -102,6 +107,20 @@ public void populate(PopulateState state, Blackhole blackhole) { blackhole.consume(store); } + @Benchmark + public long dispatchAndAckCycle(RollingWindowState state) { + return state.dispatchAndAckCycle(); + } + + @Benchmark + public long dispatchAckAndPartialAckCycle(RollingWindowState state) { + long result = state.dispatchAndAckCycle(); + if (state.shouldApplyPartialAck()) { + result += state.applyPartialAck(); + } + return result; + } + @State(Scope.Benchmark) public static class MapState { @Param({"oldProduction", "production"}) @@ -178,6 +197,128 @@ public void setup() { } } + @State(Scope.Thread) + public static class CleanupState { + @Param({"oldProduction", "production"}) + private String implementation; + + @Param({"receiverQueue1kEntries1Ledger", "defaultUnacked50kEntries1Ledger"}) + private String dataset; + + @Param({"beforePendingWindow", "smallPrefix"}) + private String scenario; + + private PendingAckStore store; + private long markDeleteLedgerId; + private long markDeleteEntryId; + + @Setup(Level.Invocation) + public void setup() { + Dataset parsedDataset = Dataset.from(dataset); + int prefixEntries = Math.max(1, parsedDataset.entries / 50); + store = createStore(implementation); + markDeleteLedgerId = 0; + switch (scenario) { + case "beforePendingWindow" -> { + populateSingleLedger(store, prefixEntries, parsedDataset.entries); + markDeleteEntryId = prefixEntries - 1L; + } + case "smallPrefix" -> { + populateSingleLedger(store, 0, parsedDataset.entries); + markDeleteEntryId = prefixEntries - 1L; + } + default -> throw new IllegalArgumentException("Unknown cleanup scenario: " + scenario); + } + } + } + + @State(Scope.Thread) + public static class RollingWindowState { + private static final int PARTIAL_ACK_INTERVAL = 16; + + @Param({"oldProduction", "production"}) + private String implementation; + + @Param({"receiverQueue1kEntries1Ledger", "batchedReceiverQueue100Entries1Ledger", + "defaultUnacked50kEntries1Ledger", "defaultUnacked50kEntries2Ledgers", + "defaultUnacked50kEntries5Ledgers", "defaultUnacked50kEntries10Ledgers", + "defaultUnacked50kEntries20Ledgers"}) + private String dataset; + + private PendingAckStore store; + private long[] ledgerIds; + private long[] entryIds; + private int[] remainingUnacked; + private int entries; + private long entriesPerLedger; + private int cursor; + private long nextSequence; + private long operations; + + @Setup(Level.Trial) + public void setup() { + Dataset parsedDataset = Dataset.from(dataset); + entries = parsedDataset.entries; + entriesPerLedger = Math.max(1, (entries + parsedDataset.ledgers - 1L) / parsedDataset.ledgers); + ledgerIds = new long[entries]; + entryIds = new long[entries]; + remainingUnacked = new int[entries]; + store = createStore(implementation); + for (int i = 0; i < entries; i++) { + setSlot(i, i); + store.addPendingAckIfAllowed(ledgerIds[i], entryIds[i], remainingUnacked[i], stickyKeyHash(i)); + } + nextSequence = entries; + } + + private long dispatchAndAckCycle() { + int slot = cursor; + cursor = nextCursor(slot); + + long ledgerIdToAck = ledgerIds[slot]; + long entryIdToAck = entryIds[slot]; + int removed = store.removeAndGetRemainingUnacked(ledgerIdToAck, entryIdToAck); + + long sequence = nextSequence++; + setSlot(slot, sequence); + store.addPendingAckIfAllowed(ledgerIds[slot], entryIds[slot], + remainingUnacked[slot], stickyKeyHash(sequence)); + return removed + remainingUnacked[slot]; + } + + private boolean shouldApplyPartialAck() { + return (operations++ & (PARTIAL_ACK_INTERVAL - 1)) == 0; + } + + private long applyPartialAck() { + int slot = cursor + entries / 2; + if (slot >= entries) { + slot -= entries; + } + int remaining = remainingUnacked[slot]; + if (remaining <= 1) { + return 0; + } + boolean updated = store.updateRemainingUnacked(ledgerIds[slot], entryIds[slot], 1); + if (!updated) { + return 0; + } + remainingUnacked[slot] = remaining - 1; + return remaining; + } + + private void setSlot(int slot, long sequence) { + ledgerIds[slot] = sequence / entriesPerLedger; + entryIds[slot] = sequence % entriesPerLedger; + remainingUnacked[slot] = remaining(sequence); + } + + private int nextCursor(int current) { + int next = current + 1; + return next == entries ? 0 : next; + } + } + @State(Scope.Thread) public static class CursorState { private int index; @@ -270,12 +411,19 @@ private static void populate(PendingAckStore store, Dataset dataset, } } - private static int remaining(int index) { - return (index & 15) + 1; + private static void populateSingleLedger(PendingAckStore store, long firstEntryId, int entries) { + for (int index = 0; index < entries; index++) { + long entryId = firstEntryId + index; + store.addPendingAckIfAllowed(0, entryId, remaining(index), stickyKeyHash(index)); + } + } + + private static int remaining(long index) { + return (int) (index & 15) + 1; } - private static int stickyKeyHash(int index) { - return index * 31; + private static int stickyKeyHash(long index) { + return (int) (index * 31); } private static final class ProductionPendingAckStore implements PendingAckStore { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 64fec113715ef..8c361b20a0e46 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import java.util.BitSet; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -99,7 +100,7 @@ public interface PendingAcksConsumer { } private final Consumer consumer; - private final TreeMap pendingAcks; + private final TreeMap pendingAcks; private final Supplier pendingAcksAddHandlerSupplier; private final Supplier pendingAcksRemoveHandlerSupplier; private final Lock readLock; @@ -153,8 +154,8 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining && !pendingAcksAddHandler.handleAdding(consumer, ledgerId, entryId, stickyKeyHash)) { return false; } - Long2LongOpenHashMap ledgerPendingAcks = - pendingAcks.computeIfAbsent(ledgerId, k -> new Long2LongOpenHashMap()); + LedgerPendingAcks ledgerPendingAcks = + pendingAcks.computeIfAbsent(ledgerId, k -> new LedgerPendingAcks()); ledgerPendingAcks.put(entryId, packPendingAckValue(remainingUnacked, stickyKeyHash)); return true; } finally { @@ -170,7 +171,7 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining public long size() { try { readLock.lock(); - return pendingAcks.values().stream().mapToInt(Long2LongOpenHashMap::size).sum(); + return pendingAcks.values().stream().mapToInt(LedgerPendingAcks::size).sum(); } finally { readLock.unlock(); } @@ -194,9 +195,9 @@ public void forEach(PendingAcksConsumer processor) { private void processPendingAcks(PendingAcksConsumer processor) { // this code uses for loops intentionally, don't refactor to use forEach // iterate the outer map - for (Map.Entry entry : pendingAcks.entrySet()) { + for (Map.Entry entry : pendingAcks.entrySet()) { long ledgerId = entry.getKey(); - Long2LongOpenHashMap ledgerPendingAcks = entry.getValue(); + LedgerPendingAcks ledgerPendingAcks = entry.getValue(); // iterate the inner map ledgerPendingAcks.forEach((entryId, packedValue) -> processor.accept(ledgerId, entryId, unpackRemainingUnacked(packedValue), @@ -261,7 +262,7 @@ private void internalForEachAndClear(PendingAcksConsumer processor, boolean clos public boolean contains(long ledgerId, long entryId) { try { readLock.lock(); - Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + LedgerPendingAcks ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -281,7 +282,7 @@ public boolean contains(long ledgerId, long entryId) { public IntIntPair get(long ledgerId, long entryId) { try { readLock.lock(); - Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + LedgerPendingAcks ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } @@ -302,7 +303,7 @@ public IntIntPair get(long ledgerId, long entryId) { public int getRemainingUnacked(long ledgerId, long entryId) { try { readLock.lock(); - Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + LedgerPendingAcks ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return PENDING_ACK_NOT_FOUND; } @@ -326,7 +327,7 @@ public int getRemainingUnacked(long ledgerId, long entryId) { public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyHash) { try { writeLock.lock(); - Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + LedgerPendingAcks ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -337,7 +338,7 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH boolean removed = unpackRemainingUnacked(packedValue) == batchSize && unpackStickyKeyHash(packedValue) == stickyKeyHash; if (removed) { - ledgerMap.remove(entryId); + ledgerMap.removePresent(entryId); handleRemovePendingAck(ledgerId, entryId, stickyKeyHash); } if (removed && ledgerMap.isEmpty()) { @@ -361,7 +362,7 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { try { writeLock.lock(); - Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + LedgerPendingAcks ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -387,7 +388,7 @@ public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelt public boolean remove(long ledgerId, long entryId) { try { writeLock.lock(); - Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + LedgerPendingAcks ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -395,7 +396,7 @@ public boolean remove(long ledgerId, long entryId) { if (isPackedPendingAckNotFound(removedEntry)) { return false; } - ledgerMap.remove(entryId); + ledgerMap.removePresent(entryId); handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); if (ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); @@ -418,7 +419,7 @@ public boolean remove(long ledgerId, long entryId) { public IntIntPair removeAndGet(long ledgerId, long entryId) { try { writeLock.lock(); - Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + LedgerPendingAcks ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } @@ -426,7 +427,7 @@ public IntIntPair removeAndGet(long ledgerId, long entryId) { if (isPackedPendingAckNotFound(removedEntry)) { return null; } - ledgerMap.remove(entryId); + ledgerMap.removePresent(entryId); handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); if (ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); @@ -447,7 +448,7 @@ public IntIntPair removeAndGet(long ledgerId, long entryId) { public int removeAndGetRemainingUnacked(long ledgerId, long entryId) { try { writeLock.lock(); - Long2LongOpenHashMap ledgerMap = pendingAcks.get(ledgerId); + LedgerPendingAcks ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return PENDING_ACK_NOT_FOUND; } @@ -455,7 +456,7 @@ public int removeAndGetRemainingUnacked(long ledgerId, long entryId) { if (isPackedPendingAckNotFound(removedEntry)) { return PENDING_ACK_NOT_FOUND; } - ledgerMap.remove(entryId); + ledgerMap.removePresent(entryId); handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); if (ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); @@ -508,63 +509,36 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } else { readLock.lock(); } - Iterator> ledgerMapIterator = + Iterator> ledgerMapIterator = pendingAcks.headMap(markDeleteLedgerId, true).entrySet().iterator(); while (ledgerMapIterator.hasNext()) { - Map.Entry entry = ledgerMapIterator.next(); + Map.Entry entry = ledgerMapIterator.next(); long ledgerId = entry.getKey(); - Long2LongOpenHashMap ledgerMap = entry.getValue(); + LedgerPendingAcks ledgerMap = entry.getValue(); if (!acquiredWriteLock) { if (ledgerId < markDeleteLedgerId && !ledgerMap.isEmpty()) { retryWithWriteLock = true; return; } - if (ledgerId == markDeleteLedgerId && !ledgerMap.isEmpty()) { + if (ledgerId == markDeleteLedgerId && ledgerMap.hasEntryUpTo(markDeleteEntryId)) { retryWithWriteLock = true; return; } continue; } if (ledgerId < markDeleteLedgerId) { - if (!ledgerMap.isEmpty() && pendingAcksRemoveHandler != null) { - if (!batchStarted) { - pendingAcksRemoveHandler.startBatch(); - batchStarted = true; - } - } + MutableBoolean batchStartedHolder = new MutableBoolean(batchStarted); ledgerMap.forEach((entryId, packedValue) -> { - int stickyKeyHash = unpackStickyKeyHash(packedValue); - if (pendingAcksRemoveHandler != null) { - pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, - stickyKeyHash, closed); - } - if (removedEntryCallback != null) { - removedEntryCallback.accept(ledgerId, entryId, - unpackRemainingUnacked(packedValue), stickyKeyHash); - } + handleRemovedEntry(ledgerId, entryId, packedValue, pendingAcksRemoveHandler, + batchStartedHolder, removedEntryCallback); }); + batchStarted = batchStartedHolder.booleanValue(); ledgerMapIterator.remove(); } else { MutableBoolean batchStartedHolder = new MutableBoolean(batchStarted); - int removed = ledgerMap.removeIf((entryId, packedValue) -> { - if (entryId > markDeleteEntryId) { - return false; - } - int stickyKeyHash = unpackStickyKeyHash(packedValue); - if (pendingAcksRemoveHandler != null) { - if (!batchStartedHolder.booleanValue()) { - pendingAcksRemoveHandler.startBatch(); - batchStartedHolder.setTrue(); - } - pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, - stickyKeyHash, closed); - } - if (removedEntryCallback != null) { - removedEntryCallback.accept(ledgerId, entryId, - unpackRemainingUnacked(packedValue), stickyKeyHash); - } - return true; - }); + int removed = ledgerMap.removeUpTo(markDeleteEntryId, (entryId, packedValue) -> + handleRemovedEntry(ledgerId, entryId, packedValue, pendingAcksRemoveHandler, + batchStartedHolder, removedEntryCallback)); batchStarted = batchStartedHolder.booleanValue(); if (removed > 0 && ledgerMap.isEmpty()) { ledgerMapIterator.remove(); @@ -587,7 +561,7 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } // A packed value can legitimately be 0, so do not use Long2LongOpenHashMap.get() for lookups here. - private static long getPackedPendingAckOrNotFound(Long2LongOpenHashMap ledgerMap, long entryId) { + private static long getPackedPendingAckOrNotFound(LedgerPendingAcks ledgerMap, long entryId) { return ledgerMap.getOrDefault(entryId, PACKED_PENDING_ACK_NOT_FOUND); } @@ -617,4 +591,133 @@ private void handleRemovePendingAck(long ledgerId, long entryId, int stickyKeyHa pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, stickyKeyHash, closed); } } + + private void handleRemovedEntry(long ledgerId, long entryId, long packedValue, + PendingAcksRemoveHandler pendingAcksRemoveHandler, + MutableBoolean batchStartedHolder, + PendingAcksConsumer removedEntryCallback) { + int stickyKeyHash = unpackStickyKeyHash(packedValue); + if (pendingAcksRemoveHandler != null) { + if (!batchStartedHolder.booleanValue()) { + pendingAcksRemoveHandler.startBatch(); + batchStartedHolder.setTrue(); + } + pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, stickyKeyHash, closed); + } + if (removedEntryCallback != null) { + removedEntryCallback.accept(ledgerId, entryId, unpackRemainingUnacked(packedValue), stickyKeyHash); + } + } + + private static final class LedgerPendingAcks { + private static final int MAX_INDEXED_ENTRY_ID = 1 << 20; + private final Long2LongOpenHashMap entries = new Long2LongOpenHashMap(); + private final BitSet entryIdIndex = new BitSet(); + private boolean entryIdIndexEnabled = true; + private long maxEntryId = Long.MIN_VALUE; + + private void put(long entryId, long packedValue) { + entries.put(entryId, packedValue); + maxEntryId = Math.max(maxEntryId, entryId); + if (!entryIdIndexEnabled) { + return; + } + if (canIndexEntryId(entryId)) { + entryIdIndex.set((int) entryId); + } else { + entryIdIndex.clear(); + entryIdIndexEnabled = false; + } + } + + private long getOrDefault(long entryId, long defaultValue) { + return entries.getOrDefault(entryId, defaultValue); + } + + private boolean containsKey(long entryId) { + return entries.containsKey(entryId); + } + + private boolean isEmpty() { + return entries.isEmpty(); + } + + private int size() { + return entries.size(); + } + + private void forEach(Long2LongOpenHashMap.EntryConsumer consumer) { + entries.forEach(consumer); + } + + private void removePresent(long entryId) { + entries.remove(entryId); + if (entryIdIndexEnabled && canIndexEntryId(entryId)) { + entryIdIndex.clear((int) entryId); + } + } + + private boolean hasEntryUpTo(long markDeleteEntryId) { + if (entries.isEmpty() || markDeleteEntryId < 0) { + return false; + } + if (markDeleteEntryId >= maxEntryId) { + return true; + } + if (!entryIdIndexEnabled) { + return true; + } + int firstEntryId = entryIdIndex.nextSetBit(0); + return firstEntryId >= 0 && firstEntryId <= markDeleteEntryId; + } + + private int removeUpTo(long markDeleteEntryId, RemovedEntryConsumer removedEntryConsumer) { + if (!hasEntryUpTo(markDeleteEntryId)) { + return 0; + } + if (markDeleteEntryId >= maxEntryId) { + entries.forEach(removedEntryConsumer::accept); + int removed = entries.size(); + entries.clear(); + entryIdIndex.clear(); + return removed; + } + if (entryIdIndexEnabled && canIndexEntryId(markDeleteEntryId)) { + return removeIndexedPrefix((int) markDeleteEntryId, removedEntryConsumer); + } + return entries.removeIf((entryId, packedValue) -> { + if (entryId > markDeleteEntryId) { + return false; + } + removedEntryConsumer.accept(entryId, packedValue); + return true; + }); + } + + private int removeIndexedPrefix(int markDeleteEntryId, RemovedEntryConsumer removedEntryConsumer) { + int removed = 0; + int indexedEntryId = entryIdIndex.nextSetBit(0); + while (indexedEntryId >= 0 && indexedEntryId <= markDeleteEntryId) { + long entryId = indexedEntryId; + long packedValue = entries.getOrDefault(entryId, PACKED_PENDING_ACK_NOT_FOUND); + if (!isPackedPendingAckNotFound(packedValue)) { + entries.remove(entryId); + removedEntryConsumer.accept(entryId, packedValue); + removed++; + } + indexedEntryId = entryIdIndex.nextSetBit(indexedEntryId + 1); + } + entryIdIndex.clear(0, markDeleteEntryId + 1); + return removed; + } + + private static boolean canIndexEntryId(long entryId) { + return entryId >= 0 && entryId <= MAX_INDEXED_ENTRY_ID; + } + } + + @FunctionalInterface + private interface RemovedEntryConsumer { + void accept(long entryId, long packedValue); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 3f99977f0ceb8..94d6fdb3bcc78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -436,6 +436,65 @@ public void removeAllUpTo_RemovesBoundaryEntriesWithUnorderedInnerMap() { assertTrue(pendingAcksMap.contains(3L, 1L)); } + @Test + public void removeAllUpTo_KeepsSameLedgerEntriesAfterMarkDelete() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(2L, 10L, 1, 110); + pendingAcksMap.addPendingAckIfAllowed(2L, 11L, 1, 111); + pendingAcksMap.addPendingAckIfAllowed(2L, 12L, 1, 112); + + assertEquals(pendingAcksMap.removeAndGetRemainingUnacked(2L, 10L), 1); + List removedEntries = new ArrayList<>(); + pendingAcksMap.removeAllUpTo(2L, 10L, (ledgerId, entryId, batchSize, stickyKeyHash) -> + removedEntries.add(entryId)); + + assertTrue(removedEntries.isEmpty()); + assertTrue(pendingAcksMap.contains(2L, 11L)); + assertTrue(pendingAcksMap.contains(2L, 12L)); + assertEquals(pendingAcksMap.size(), 2); + } + + @Test + public void removeAllUpTo_HandlesLargeEntryGapsAfterFirstEntryRemoval() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 101); + pendingAcksMap.addPendingAckIfAllowed(2L, 5000L, 1, 150); + pendingAcksMap.addPendingAckIfAllowed(2L, 6000L, 1, 160); + + assertTrue(pendingAcksMap.remove(2L, 1L)); + pendingAcksMap.removeAllUpTo(2L, 4999L, (ledgerId, entryId, batchSize, stickyKeyHash) -> { + }); + assertTrue(pendingAcksMap.contains(2L, 5000L)); + assertTrue(pendingAcksMap.contains(2L, 6000L)); + + pendingAcksMap.removeAllUpTo(2L, 5000L, (ledgerId, entryId, batchSize, stickyKeyHash) -> { + }); + assertFalse(pendingAcksMap.contains(2L, 5000L)); + assertTrue(pendingAcksMap.contains(2L, 6000L)); + assertEquals(pendingAcksMap.size(), 1); + } + + @Test + public void removeAllUpTo_HandlesEntryIdsOutsideBitmapIndexRange() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + long firstLargeEntryId = (long) Integer.MAX_VALUE + 5L; + long secondLargeEntryId = firstLargeEntryId + 10L; + pendingAcksMap.addPendingAckIfAllowed(2L, firstLargeEntryId, 1, 101); + pendingAcksMap.addPendingAckIfAllowed(2L, secondLargeEntryId, 1, 102); + + List removedEntries = new ArrayList<>(); + pendingAcksMap.removeAllUpTo(2L, firstLargeEntryId, (ledgerId, entryId, batchSize, stickyKeyHash) -> + removedEntries.add(entryId)); + + assertEquals(removedEntries, List.of(firstLargeEntryId)); + assertFalse(pendingAcksMap.contains(2L, firstLargeEntryId)); + assertTrue(pendingAcksMap.contains(2L, secondLargeEntryId)); + assertEquals(pendingAcksMap.size(), 1); + } + @Test public void removeAllUpTo_RemovesWholeLedgersAndUnorderedBoundaryEntries() { Consumer consumer = createMockConsumer("consumer1"); From 82772aef78d73aea95b9e8e687241c9adc780599 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sun, 14 Jun 2026 16:21:17 +0800 Subject: [PATCH 6/6] [test][common] Harden Long2LongOpenHashMap tests --- .../collections/Long2LongOpenHashMapTest.java | 211 +++++++++++++++--- 1 file changed, 183 insertions(+), 28 deletions(-) diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMapTest.java index a94840ac7e11d..c2c6e428db5e6 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2LongOpenHashMapTest.java @@ -21,10 +21,16 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import org.testng.Reporter; import org.testng.annotations.Test; public class Long2LongOpenHashMapTest { @@ -93,6 +99,28 @@ public void testZeroValueCanBeDistinguishedFromMissingKey() { assertEquals(map.getOrDefault(2, -1), -1L); } + @Test + public void testEdgeKeysAndValuesRoundTrip() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(4); + Map expected = new HashMap<>(); + long[][] entries = { + {0L, 0L}, + {Long.MIN_VALUE, Long.MIN_VALUE}, + {Long.MAX_VALUE, Long.MAX_VALUE}, + {-1L, 1L}, + {1L, -1L}, + {Long.MIN_VALUE + 1, Long.MAX_VALUE - 1}, + {Long.MAX_VALUE - 1, Long.MIN_VALUE + 1} + }; + + for (long[] entry : entries) { + expected.put(entry[0], entry[1]); + assertEquals(map.put(entry[0], entry[1]), 0L); + } + + assertLong2LongMapMatches(expected, expected.keySet(), map, "edge values"); + } + @Test public void testComputeIfAbsent() { Long2LongOpenHashMap map = new Long2LongOpenHashMap(); @@ -152,53 +180,180 @@ public void testRehash() { } @Test - public void testRandomOperationsAgainstHashMap() { + public void testRemovePreservesProbeChainWithCollisions() { + Long2LongOpenHashMap map = new Long2LongOpenHashMap(4); + List keys = collidingLongKeys(16, 12); + + for (int i = 0; i < keys.size(); i++) { + assertEquals(map.put(keys.get(i), valueForIndex(i)), 0L); + } + + assertEquals(map.remove(keys.get(0)), valueForIndex(0)); + assertEquals(map.remove(keys.get(5)), valueForIndex(5)); + assertEquals(map.remove(keys.get(11)), valueForIndex(11)); + + for (int i = 1; i < keys.size() - 1; i++) { + long key = keys.get(i); + if (i != 5) { + assertEquals(map.get(key), valueForIndex(i)); + assertTrue(map.containsKey(key)); + } + } + assertFalse(map.containsKey(keys.get(0))); + assertFalse(map.containsKey(keys.get(5))); + assertFalse(map.containsKey(keys.get(11))); + + assertEquals(map.put(keys.get(5), Long.MIN_VALUE), 0L); + assertEquals(map.getOrDefault(keys.get(5), -1L), Long.MIN_VALUE); + } + + @Test + public void testRandomizedOperationsAgainstHashMap() { Long2LongOpenHashMap map = new Long2LongOpenHashMap(4); Map expected = new HashMap<>(); - Random random = new Random(0); + Set seenKeys = new HashSet<>(); + long seed = randomSeed("testRandomizedOperationsAgainstHashMap"); + Random random = new Random(seed); - for (int i = 0; i < 10_000; i++) { - long key = random.nextInt(512) - 256L; - switch (random.nextInt(5)) { + for (int i = 0; i < 20_000; i++) { + long key = randomLongWithEdgeCases(random, 512); + seenKeys.add(key); + String context = "seed=" + seed + " iteration=" + i + " key=" + key; + + switch (random.nextInt(100)) { case 0 -> { - long value = random.nextLong(); + long value = randomValue(random); Long previous = expected.put(key, value); - assertEquals(map.put(key, value), previous == null ? 0L : previous.longValue()); + assertEquals(map.put(key, value), previous == null ? 0L : previous.longValue(), context); } case 1 -> { Long previous = expected.remove(key); - assertEquals(map.remove(key), previous == null ? 0L : previous.longValue()); + assertEquals(map.remove(key), previous == null ? 0L : previous.longValue(), context); } - case 2 -> assertEquals(map.get(key), expected.getOrDefault(key, 0L).longValue()); - case 3 -> assertEquals(map.containsKey(key), expected.containsKey(key)); - case 4 -> { - long value = key * 37 + i; + case 2 -> { + long value = randomValue(random); Long previous = expected.get(key); assertEquals(map.computeIfAbsent(key, ignored -> value), - previous == null ? value : previous.longValue()); + previous == null ? value : previous.longValue(), context); expected.putIfAbsent(key, value); } - default -> throw new IllegalStateException("Unexpected random operation"); + case 3 -> assertEquals(map.get(key), expected.getOrDefault(key, 0L).longValue(), context); + case 4 -> { + long defaultValue = randomValue(random); + assertEquals(map.getOrDefault(key, defaultValue), + expected.getOrDefault(key, defaultValue).longValue(), context); + } + case 5 -> assertEquals(map.containsKey(key), expected.containsKey(key), context); + case 6 -> runRemoveIfScenario(map, expected, random, context); + case 7 -> { + map.clear(); + expected.clear(); + } + default -> { + long otherKey = randomLongWithEdgeCases(random, 512); + seenKeys.add(otherKey); + long value = randomValue(random); + Long previous = expected.put(otherKey, value); + assertEquals(map.put(otherKey, value), previous == null ? 0L : previous.longValue(), + context + " otherKey=" + otherKey); + } } if (i % 257 == 0) { - int removed = map.removeIf((entryKey, value) -> (entryKey & 3) == 0); - int expectedRemoved = 0; - Iterator> iterator = expected.entrySet().iterator(); - while (iterator.hasNext()) { - if ((iterator.next().getKey() & 3) == 0) { - iterator.remove(); - expectedRemoved++; - } - } - assertEquals(removed, expectedRemoved); + runRemoveIfScenario(map, expected, random, context + " periodicRemoveIf"); + } + + assertLong2LongMapMatches(expected, seenKeys, map, context); + } + } + + private static void runRemoveIfScenario(Long2LongOpenHashMap map, Map expected, Random random, + String context) { + int selector = random.nextInt(4); + int removed = map.removeIf((entryKey, value) -> removeIfMatches(selector, entryKey, value)); + + int expectedRemoved = 0; + Iterator> iterator = expected.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (removeIfMatches(selector, entry.getKey(), entry.getValue())) { + iterator.remove(); + expectedRemoved++; } + } + assertEquals(removed, expectedRemoved, context + " removeIfSelector=" + selector); + } + + private static boolean removeIfMatches(int selector, long key, long value) { + return switch (selector) { + case 0 -> (key & 3L) == 0; + case 1 -> (value & 7L) == 0; + case 2 -> key < 0 && value <= 0; + case 3 -> key == Long.MIN_VALUE || value == Long.MAX_VALUE; + default -> throw new IllegalArgumentException("Unknown selector: " + selector); + }; + } + + private static void assertLong2LongMapMatches(Map expected, Iterable seenKeys, + Long2LongOpenHashMap actual, String context) { + long missingValue = 0x5A5A_5A5A_5A5A_5A5AL; + assertEquals(actual.isEmpty(), expected.isEmpty(), context); + assertEquals(actual.size(), expected.size(), context); - assertEquals(map.size(), expected.size()); - for (Map.Entry entry : expected.entrySet()) { - assertTrue(map.containsKey(entry.getKey())); - assertEquals(map.get(entry.getKey()), entry.getValue().longValue()); + for (long key : seenKeys) { + Long expectedValue = expected.get(key); + assertEquals(actual.containsKey(key), expectedValue != null, context + " checkedKey=" + key); + assertEquals(actual.get(key), expectedValue == null ? 0L : expectedValue.longValue(), + context + " checkedKey=" + key); + assertEquals(actual.getOrDefault(key, missingValue), + expectedValue == null ? missingValue : expectedValue.longValue(), context + " checkedKey=" + key); + } + + Map actualEntries = new HashMap<>(); + actual.forEach(actualEntries::put); + assertEquals(actualEntries, expected, context); + } + + private static long randomSeed(String testName) { + String configuredSeed = System.getProperty("pulsar.collections.randomSeed"); + long seed = configuredSeed != null ? Long.parseLong(configuredSeed) : ThreadLocalRandom.current().nextLong(); + String message = Long2LongOpenHashMapTest.class.getSimpleName() + "." + testName + " seed=" + seed; + Reporter.log(message, true); + System.err.println(message); + return seed; + } + + private static long randomValue(Random random) { + return switch (random.nextInt(32)) { + case 0 -> 0L; + case 1 -> Long.MIN_VALUE; + case 2 -> Long.MAX_VALUE; + default -> random.nextInt(1_024) - 512L; + }; + } + + private static long randomLongWithEdgeCases(Random random, int bound) { + return switch (random.nextInt(64)) { + case 0 -> 0L; + case 1 -> Long.MIN_VALUE; + case 2 -> Long.MAX_VALUE; + default -> random.nextInt(bound) - bound / 2L; + }; + } + + private static List collidingLongKeys(int capacity, int count) { + int mask = capacity - 1; + int bucket = Long2ObjectOpenHashMap.hash(0) & mask; + List keys = new ArrayList<>(); + for (long candidate = 0; keys.size() < count; candidate++) { + if ((Long2ObjectOpenHashMap.hash(candidate) & mask) == bucket) { + keys.add(candidate); } } + return keys; + } + + private static long valueForIndex(int index) { + return index % 3 == 0 ? 0L : index * 101L - 17L; } }