From 6b62b03f054819637efa343f2d471d3e02ac69e2 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sun, 14 Jun 2026 17:27:40 +0800 Subject: [PATCH 1/4] [improve][broker] Pack PendingAcksMap values into long --- .../pulsar/broker/service/Consumer.java | 46 ++++-- .../pulsar/broker/service/PendingAcksMap.java | 139 +++++++++++++----- .../broker/service/PendingAcksMapTest.java | 112 +++++++++++++- 3 files changed, 244 insertions(+), 53 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0f4708fa39fe7..e2f5e42f84b30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -83,6 +83,7 @@ public class Consumer { private static final Logger LOG = Logger.get(Consumer.class); + private static final int PENDING_ACK_NOT_FOUND = PendingAcksMap.PENDING_ACK_NOT_FOUND; private final Logger log; private final Subscription subscription; @@ -649,10 +650,10 @@ private CompletableFuture individualAck(CommandAck ack, Map addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); } } else if (!hasAckSet) { - IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet( + int removed = ackOwnerConsumer.removePendingAckAndGetRemainingUnacked( position.getLedgerId(), position.getEntryId()); - if (removed != null) { - addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt()); + if (removed != PENDING_ACK_NOT_FOUND) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -removed); updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); } } @@ -744,10 +745,10 @@ private void applyPendingAckCompletions(List pendingAckCom } } } else { - IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet( + int removed = ackOwnerConsumer.removePendingAckAndGetRemainingUnacked( position.getLedgerId(), position.getEntryId()); - if (removed != null) { - addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt()); + if (removed != PENDING_ACK_NOT_FOUND) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -removed); } } updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); @@ -841,16 +842,16 @@ private void checkAckValidationError(CommandAck ack, Position position) { */ private ObjectIntPair getAckOwnerConsumerAndBatchSize(long ledgerId, long entryId) { if (Subscription.isIndividualAckMode(subType)) { - IntIntPair pendingAck = getPendingAcks().get(ledgerId, entryId); - if (pendingAck != null) { - return ObjectIntPair.of(this, pendingAck.leftInt()); + int remainingUnacked = getPendingAcks().getRemainingUnacked(ledgerId, entryId); + if (remainingUnacked != PENDING_ACK_NOT_FOUND) { + return ObjectIntPair.of(this, remainingUnacked); } else { // If there are more consumers, this step will consume more CPU, and it should be optimized later. for (Consumer consumer : subscription.getConsumers()) { if (consumer != this) { - pendingAck = consumer.getPendingAcks().get(ledgerId, entryId); - if (pendingAck != null) { - return ObjectIntPair.of(consumer, pendingAck.leftInt()); + remainingUnacked = consumer.getPendingAcks().getRemainingUnacked(ledgerId, entryId); + if (remainingUnacked != PENDING_ACK_NOT_FOUND) { + return ObjectIntPair.of(consumer, remainingUnacked); } } } @@ -1192,6 +1193,20 @@ public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelt return true; } + /** + * Atomically remove the pending ack entry and return its stored values. + * + *

No-op if {@code pendingAcks} is not initialized. + * + * @return the remaining unacked count, or {@link PendingAcksMap#PENDING_ACK_NOT_FOUND} if not found + */ + public int removePendingAckAndGetRemainingUnacked(long ledgerId, long entryId) { + if (pendingAcks != null) { + return pendingAcks.removeAndGetRemainingUnacked(ledgerId, entryId); + } + return PENDING_ACK_NOT_FOUND; + } + /** * Atomically remove the pending ack entry and return its stored values. * @@ -1273,9 +1288,10 @@ public void redeliverUnacknowledgedMessages(List messageIds) { List pendingPositions = new ArrayList<>(); for (MessageIdData msg : messageIds) { Position position = PositionFactory.create(msg.getLedgerId(), msg.getEntryId()); - IntIntPair pendingAck = pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId()); - if (pendingAck != null) { - totalRedeliveryMessages += pendingAck.leftInt(); + int remainingUnacked = pendingAcks.removeAndGetRemainingUnacked( + position.getLedgerId(), position.getEntryId()); + if (remainingUnacked != PENDING_ACK_NOT_FOUND) { + totalRedeliveryMessages += remainingUnacked; pendingPositions.add(position); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 70f1a7dd247c8..8c806b2524861 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -39,6 +39,9 @@ * running. */ public class PendingAcksMap { + static final int PENDING_ACK_NOT_FOUND = -1; + private static final long STICKY_KEY_HASH_MASK = 0xFFFF_FFFFL; + /** * Callback interface for handling the addition of pending acknowledgments. */ @@ -97,7 +100,7 @@ public interface PendingAcksConsumer { } private final Consumer consumer; - private final TreeMap> pendingAcks; + private final TreeMap> pendingAcks; private final Supplier pendingAcksAddHandlerSupplier; private final Supplier pendingAcksRemoveHandlerSupplier; private final Lock readLock; @@ -143,9 +146,9 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining && !pendingAcksAddHandler.handleAdding(consumer, ledgerId, entryId, stickyKeyHash)) { return false; } - TreeMap ledgerPendingAcks = + TreeMap ledgerPendingAcks = pendingAcks.computeIfAbsent(ledgerId, k -> new TreeMap<>()); - ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, stickyKeyHash)); + ledgerPendingAcks.put(entryId, packPendingAckValue(remainingUnacked, stickyKeyHash)); return true; } finally { writeLock.unlock(); @@ -184,15 +187,15 @@ public void forEach(PendingAcksConsumer processor) { private void processPendingAcks(PendingAcksConsumer processor) { // this code uses for loops intentionally, don't refactor to use forEach // iterate the outer map - for (Map.Entry> entry : pendingAcks.entrySet()) { + for (Map.Entry> entry : pendingAcks.entrySet()) { long ledgerId = entry.getKey(); - TreeMap ledgerPendingAcks = entry.getValue(); + TreeMap ledgerPendingAcks = entry.getValue(); // iterate the inner map - for (Map.Entry e : ledgerPendingAcks.entrySet()) { + for (Map.Entry e : ledgerPendingAcks.entrySet()) { long entryId = e.getKey(); - IntIntPair batchSizeAndStickyKeyHash = e.getValue(); - processor.accept(ledgerId, entryId, batchSizeAndStickyKeyHash.leftInt(), - batchSizeAndStickyKeyHash.rightInt()); + long packedValue = e.getValue(); + processor.accept(ledgerId, entryId, unpackRemainingUnacked(packedValue), + unpackStickyKeyHash(packedValue)); } } } @@ -254,7 +257,7 @@ private void internalForEachAndClear(PendingAcksConsumer processor, boolean clos public boolean contains(long ledgerId, long entryId) { try { readLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + TreeMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -274,11 +277,31 @@ public boolean contains(long ledgerId, long entryId) { public IntIntPair get(long ledgerId, long entryId) { try { readLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + TreeMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } - return ledgerMap.get(entryId); + Long packedValue = ledgerMap.get(entryId); + return packedValue == null ? null : unpackPendingAckValue(packedValue); + } finally { + readLock.unlock(); + } + } + + /** + * Get the remaining unacked count for the given ledger ID and entry ID. + * + * @return the remaining unacked count, or {@link #PENDING_ACK_NOT_FOUND} if not found + */ + int getRemainingUnacked(long ledgerId, long entryId) { + try { + readLock.lock(); + TreeMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return PENDING_ACK_NOT_FOUND; + } + Long packedValue = ledgerMap.get(entryId); + return packedValue == null ? PENDING_ACK_NOT_FOUND : unpackRemainingUnacked(packedValue); } finally { readLock.unlock(); } @@ -296,11 +319,11 @@ public IntIntPair get(long ledgerId, long entryId) { public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyHash) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + TreeMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } - boolean removed = ledgerMap.remove(entryId, IntIntPair.of(batchSize, stickyKeyHash)); + boolean removed = ledgerMap.remove(entryId, packPendingAckValue(batchSize, stickyKeyHash)); if (removed) { handleRemovePendingAck(ledgerId, entryId, stickyKeyHash); } @@ -325,16 +348,16 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + TreeMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } - IntIntPair current = ledgerMap.get(entryId); - if (current == null) { + Long packedValue = ledgerMap.get(entryId); + if (packedValue == null) { return false; } - int newRemaining = current.leftInt() - ackedDelta; - ledgerMap.put(entryId, IntIntPair.of(newRemaining, current.rightInt())); + int newRemaining = unpackRemainingUnacked(packedValue) - ackedDelta; + ledgerMap.put(entryId, packPendingAckValue(newRemaining, unpackStickyKeyHash(packedValue))); return true; } finally { writeLock.unlock(); @@ -351,15 +374,14 @@ public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelt public boolean remove(long ledgerId, long entryId) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + TreeMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } - IntIntPair removedEntry = ledgerMap.remove(entryId); + Long removedEntry = ledgerMap.remove(entryId); boolean removed = removedEntry != null; if (removed) { - int stickyKeyHash = removedEntry.rightInt(); - handleRemovePendingAck(ledgerId, entryId, stickyKeyHash); + handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); } if (removed && ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); @@ -382,18 +404,44 @@ public boolean remove(long ledgerId, long entryId) { public IntIntPair removeAndGet(long ledgerId, long entryId) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + TreeMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } - IntIntPair removedEntry = ledgerMap.remove(entryId); + Long removedEntry = ledgerMap.remove(entryId); if (removedEntry != null) { - handleRemovePendingAck(ledgerId, entryId, removedEntry.rightInt()); + handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); } if (removedEntry != null && ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); } - return removedEntry; + return removedEntry == null ? null : unpackPendingAckValue(removedEntry); + } finally { + writeLock.unlock(); + } + } + + /** + * Atomically remove and return the remaining unacked count for the given ledger ID and entry ID. + * + * @return the remaining unacked count, or {@link #PENDING_ACK_NOT_FOUND} if not found + */ + int removeAndGetRemainingUnacked(long ledgerId, long entryId) { + try { + writeLock.lock(); + TreeMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return PENDING_ACK_NOT_FOUND; + } + Long removedEntry = ledgerMap.remove(entryId); + if (removedEntry == null) { + return PENDING_ACK_NOT_FOUND; + } + handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); + if (ledgerMap.isEmpty()) { + pendingAcks.remove(ledgerId); + } + return unpackRemainingUnacked(removedEntry); } finally { writeLock.unlock(); } @@ -440,30 +488,30 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } else { readLock.lock(); } - Iterator>> ledgerMapIterator = + Iterator>> ledgerMapIterator = pendingAcks.headMap(markDeleteLedgerId + 1).entrySet().iterator(); while (ledgerMapIterator.hasNext()) { - Map.Entry> entry = ledgerMapIterator.next(); + Map.Entry> entry = ledgerMapIterator.next(); long ledgerId = entry.getKey(); - TreeMap ledgerMap = entry.getValue(); - TreeMap ledgerMapHead; + TreeMap ledgerMap = entry.getValue(); + TreeMap ledgerMapHead; if (ledgerId == markDeleteLedgerId) { ledgerMapHead = new TreeMap<>(ledgerMap.headMap(markDeleteEntryId + 1)); } else { ledgerMapHead = ledgerMap; } - Iterator> entryMapIterator = + Iterator> entryMapIterator = ledgerMapHead.entrySet().iterator(); while (entryMapIterator.hasNext()) { - Map.Entry intIntPairEntry = entryMapIterator.next(); - long entryId = intIntPairEntry.getKey(); + Map.Entry pendingAckEntry = entryMapIterator.next(); + long entryId = pendingAckEntry.getKey(); if (!acquiredWriteLock) { retryWithWriteLock = true; return; } - IntIntPair value = intIntPairEntry.getValue(); - int batchSize = value.leftInt(); - int stickyKeyHash = value.rightInt(); + long packedValue = pendingAckEntry.getValue(); + int batchSize = unpackRemainingUnacked(packedValue); + int stickyKeyHash = unpackStickyKeyHash(packedValue); if (pendingAcksRemoveHandler != null) { if (!batchStarted) { pendingAcksRemoveHandler.startBatch(); @@ -503,6 +551,23 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } } + private static long packPendingAckValue(int remainingUnacked, int stickyKeyHash) { + return ((long) remainingUnacked << Integer.SIZE) | (stickyKeyHash & STICKY_KEY_HASH_MASK); + } + + private static IntIntPair unpackPendingAckValue(long packedValue) { + return IntIntPair.of(unpackRemainingUnacked(packedValue), unpackStickyKeyHash(packedValue)); + } + + private static int unpackRemainingUnacked(long packedValue) { + return (int) (packedValue >> Integer.SIZE); + } + + // Casting back to int restores the original signed sticky key hash. + private static int unpackStickyKeyHash(long packedValue) { + return (int) packedValue; + } + private void handleRemovePendingAck(long ledgerId, long entryId, int stickyKeyHash) { PendingAcksRemoveHandler pendingAcksRemoveHandler = pendingAcksRemoveHandlerSupplier.get(); if (pendingAcksRemoveHandler != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 02bf098561c1d..57319621914b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -299,4 +299,114 @@ public void removeAndGet_InvokesRemoveHandler() { verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false); } -} \ No newline at end of file + + @Test + public void getRemainingUnacked_ReturnsStoredCount() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 0, -1); + pendingAcksMap.addPendingAckIfAllowed(1L, 2L, Integer.MAX_VALUE, Integer.MIN_VALUE); + + assertEquals(pendingAcksMap.getRemainingUnacked(1L, 1L), 0); + assertEquals(pendingAcksMap.getRemainingUnacked(1L, 2L), Integer.MAX_VALUE); + assertEquals(pendingAcksMap.getRemainingUnacked(1L, 3L), PendingAcksMap.PENDING_ACK_NOT_FOUND); + } + + @Test + public void removeAndGetRemainingUnacked_RemovesAndInvokesRemoveHandler() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap.PendingAcksRemoveHandler removeHandler = mock(PendingAcksMap.PendingAcksRemoveHandler.class); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> removeHandler); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 0, Integer.MIN_VALUE); + pendingAcksMap.addPendingAckIfAllowed(1L, 2L, Integer.MAX_VALUE, -1); + + assertEquals(pendingAcksMap.removeAndGetRemainingUnacked(1L, 1L), 0); + assertEquals(pendingAcksMap.removeAndGetRemainingUnacked(1L, 2L), Integer.MAX_VALUE); + assertEquals(pendingAcksMap.removeAndGetRemainingUnacked(1L, 3L), PendingAcksMap.PENDING_ACK_NOT_FOUND); + + verify(removeHandler).handleRemoving(consumer, 1L, 1L, Integer.MIN_VALUE, false); + verify(removeHandler).handleRemoving(consumer, 1L, 2L, -1, false); + assertEquals(pendingAcksMap.size(), 0); + } + + @Test + public void packedPendingAckFields_RoundTripThroughPublicAccessors() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + int[][] values = new int[][] { + {1, 123}, + {0, 0}, + {Integer.MAX_VALUE, Integer.MIN_VALUE}, + {Integer.MIN_VALUE, Integer.MAX_VALUE}, + {42, -123456789} + }; + + for (int i = 0; i < values.length; i++) { + pendingAcksMap.addPendingAckIfAllowed(10L + i, 100L + i, values[i][0], values[i][1]); + } + + List forEachValues = new ArrayList<>(); + pendingAcksMap.forEach((ledgerId, entryId, remainingUnacked, stickyKeyHash) -> + forEachValues.add(new int[] { + (int) ledgerId, (int) entryId, remainingUnacked, stickyKeyHash + })); + assertEquals(forEachValues.size(), values.length); + for (int i = 0; i < values.length; i++) { + assertPendingAck(pendingAcksMap.get(10L + i, 100L + i), values[i][0], values[i][1]); + assertEquals(forEachValues.get(i), new int[] {10 + i, 100 + i, values[i][0], values[i][1]}); + } + + for (int i = 0; i < values.length; i++) { + assertPendingAck(pendingAcksMap.removeAndGet(10L + i, 100L + i), values[i][0], values[i][1]); + } + assertEquals(pendingAcksMap.size(), 0); + } + + @Test + public void updateRemainingUnacked_PreservesStickyKeyHash() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 10, Integer.MIN_VALUE); + + assertTrue(pendingAcksMap.updateRemainingUnacked(1L, 1L, 4)); + + assertPendingAck(pendingAcksMap.get(1L, 1L), 6, Integer.MIN_VALUE); + } + + @Test + public void removeAllUpTo_PreservesPackedFieldsInCallback() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, Integer.MAX_VALUE, Integer.MIN_VALUE); + pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, -1); + pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 2, Integer.MAX_VALUE); + + List callbackInvocations = new ArrayList<>(); + pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> + callbackInvocations.add(new int[] { + (int) ledgerId, (int) entryId, remainingUnacked, stickyKeyHash + })); + + assertEquals(callbackInvocations.size(), 2); + assertEquals(callbackInvocations.get(0), new int[] {1, 1, Integer.MAX_VALUE, Integer.MIN_VALUE}); + assertEquals(callbackInvocations.get(1), new int[] {1, 2, 1, -1}); + assertTrue(pendingAcksMap.contains(2L, 1L)); + } + + @Test + public void removeWithValue_MatchesSignedPackedFields() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, Integer.MAX_VALUE, -1); + + assertFalse(pendingAcksMap.remove(1L, 1L, Integer.MAX_VALUE, 1)); + assertTrue(pendingAcksMap.remove(1L, 1L, Integer.MAX_VALUE, -1)); + assertFalse(pendingAcksMap.contains(1L, 1L)); + } + + private static void assertPendingAck(IntIntPair pendingAck, int remainingUnacked, int stickyKeyHash) { + assertTrue(pendingAck != null); + assertEquals(pendingAck.leftInt(), remainingUnacked); + assertEquals(pendingAck.rightInt(), stickyKeyHash); + } +} From eb44a26ace7b6c9879509fe3f13ddb18ac6eaea9 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Wed, 17 Jun 2026 10:14:27 +0800 Subject: [PATCH 2/4] [improve][broker] Avoid pending ack value boxing --- microbench/build.gradle.kts | 1 + .../PendingAcksMapPackingBenchmark.java | 566 ++++++++++++++++++ .../pulsar/broker/service/package-info.java | 22 + .../pulsar/broker/service/Consumer.java | 4 + .../pulsar/broker/service/PendingAcksMap.java | 70 ++- .../broker/service/PendingAcksMapTest.java | 36 +- 6 files changed, 685 insertions(+), 14 deletions(-) create mode 100644 microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapPackingBenchmark.java create mode 100644 microbench/src/main/java/org/apache/pulsar/broker/service/package-info.java diff --git a/microbench/build.gradle.kts b/microbench/build.gradle.kts index 9cea5eb6bb8e9..216486255c485 100644 --- a/microbench/build.gradle.kts +++ b/microbench/build.gradle.kts @@ -29,6 +29,7 @@ dependencies { implementation(project(":pulsar-common")) api(project(":pulsar-broker")) implementation(libs.bookkeeper.server) + implementation(libs.fastutil) api(libs.guava) api("org.openjdk.jmh:jmh-core:1.37") annotationProcessor("org.openjdk.jmh:jmh-generator-annprocess:1.37") diff --git a/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapPackingBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapPackingBenchmark.java new file mode 100644 index 0000000000000..3314a3bea7e61 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/service/PendingAcksMapPackingBenchmark.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import it.unimi.dsi.fastutil.ints.IntIntPair; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.Long2LongRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2LongSortedMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.objects.ObjectBidirectionalIterator; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@BenchmarkMode(Mode.AverageTime) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) +public class PendingAcksMapPackingBenchmark { + private static final int PENDING_ACK_NOT_FOUND = -1; + private static final long STICKY_KEY_HASH_MASK = 0xFFFF_FFFFL; + private static final long PACKED_PENDING_ACK_NOT_FOUND = packPendingAckValueUnchecked(PENDING_ACK_NOT_FOUND, 0); + + @Benchmark + public boolean addOrReplace(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.addOrReplace(state.ledgerIds[index], state.entryIds[index], + remainingUnacked(index), stickyKeyHash(index)); + } + + @Benchmark + public boolean containsHit(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.contains(state.ledgerIds[index], state.entryIds[index]); + } + + @Benchmark + public long forEachScan(MapState state) { + return state.store.forEachScan(); + } + + @Benchmark + public IntIntPair getPairHit(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.get(state.ledgerIds[index], state.entryIds[index]); + } + + @Benchmark + public int getRemainingUnackedHit(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.getRemainingUnacked(state.ledgerIds[index], state.entryIds[index]); + } + + @Benchmark + public long removeAllUpToBeforeFirstEntry(MapState state) { + return state.store.removeAllUpTo(0, -1); + } + + @Benchmark + public long removeAllUpToSmallPrefixAndRefill(PrefixRemoveState state) { + long removed = state.store.removeAllUpTo(0, state.prefixEntries - 1L); + for (int i = 0; i < state.prefixEntries; i++) { + state.store.addOrReplace(0, i, remainingUnacked(i), stickyKeyHash(i)); + } + return removed; + } + + @Benchmark + public int removeAndGetRemainingAndAdd(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + long ledgerId = state.ledgerIds[index]; + long entryId = state.entryIds[index]; + int remainingUnacked = state.store.removeAndGetRemainingUnacked(ledgerId, entryId); + state.store.addOrReplace(ledgerId, entryId, + remainingUnacked == PENDING_ACK_NOT_FOUND ? remainingUnacked(index) : remainingUnacked, + stickyKeyHash(index)); + return remainingUnacked; + } + + @Benchmark + public boolean removeWithValueAndAdd(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + long ledgerId = state.ledgerIds[index]; + long entryId = state.entryIds[index]; + int remainingUnacked = remainingUnacked(index); + int stickyKeyHash = stickyKeyHash(index); + boolean removed = state.store.remove(ledgerId, entryId, remainingUnacked, stickyKeyHash); + state.store.addOrReplace(ledgerId, entryId, remainingUnacked, stickyKeyHash); + return removed; + } + + @Benchmark + public boolean updateRemainingUnacked(MapState state, CursorState cursor) { + int index = cursor.next(state.entries); + return state.store.updateRemainingUnacked(state.ledgerIds[index], state.entryIds[index], 1); + } + + @State(Scope.Benchmark) + public static class MapState { + @Param({"FASTUTIL_OBJECT", "FASTUTIL_PACKED", "FASTUTIL_PACKED_SENTINEL"}) + private Implementation implementation; + + @Param({"50000"}) + private int entries; + + @Param({"1"}) + private int ledgers; + + private PendingAcksStore store; + private long[] ledgerIds; + private long[] entryIds; + + @Setup(Level.Trial) + public void setup() { + store = implementation.createStore(); + ledgerIds = new long[entries]; + entryIds = new long[entries]; + populate(store, entries, ledgers, ledgerIds, entryIds); + } + } + + @State(Scope.Thread) + public static class PrefixRemoveState { + @Param({"FASTUTIL_OBJECT", "FASTUTIL_PACKED", "FASTUTIL_PACKED_SENTINEL"}) + private Implementation implementation; + + @Param({"50000"}) + private int entries; + + private PendingAcksStore store; + private int prefixEntries; + + @Setup(Level.Trial) + public void setup() { + store = implementation.createStore(); + prefixEntries = Math.max(1, entries / 50); + populate(store, entries, 1, null, null); + } + } + + @State(Scope.Thread) + public static class CursorState { + private int cursor; + + int next(int bound) { + int next = cursor++; + if (cursor == bound) { + cursor = 0; + } + return next; + } + } + + public enum Implementation { + FASTUTIL_OBJECT { + @Override + PendingAcksStore createStore() { + return new FastutilObjectStore(); + } + }, + FASTUTIL_PACKED { + @Override + PendingAcksStore createStore() { + return new FastutilPackedStore(false); + } + }, + FASTUTIL_PACKED_SENTINEL { + @Override + PendingAcksStore createStore() { + return new FastutilPackedStore(true); + } + }; + + abstract PendingAcksStore createStore(); + } + + private interface PendingAcksStore { + boolean addOrReplace(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash); + + boolean contains(long ledgerId, long entryId); + + IntIntPair get(long ledgerId, long entryId); + + int getRemainingUnacked(long ledgerId, long entryId); + + boolean remove(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash); + + int removeAndGetRemainingUnacked(long ledgerId, long entryId); + + boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta); + + long forEachScan(); + + long removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId); + } + + private static final class FastutilObjectStore implements PendingAcksStore { + private final Long2ObjectSortedMap> pendingAcks = + new Long2ObjectRBTreeMap<>(); + + @Override + public boolean addOrReplace(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash) { + Long2ObjectSortedMap ledgerPendingAcks = + pendingAcks.computeIfAbsent(ledgerId, k -> new Long2ObjectRBTreeMap<>()); + ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, stickyKeyHash)); + return true; + } + + @Override + public boolean contains(long ledgerId, long entryId) { + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); + return ledgerMap != null && ledgerMap.containsKey(entryId); + } + + @Override + public IntIntPair get(long ledgerId, long entryId) { + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); + return ledgerMap == null ? null : ledgerMap.get(entryId); + } + + @Override + public int getRemainingUnacked(long ledgerId, long entryId) { + IntIntPair value = get(ledgerId, entryId); + return value == null ? PENDING_ACK_NOT_FOUND : value.leftInt(); + } + + @Override + public boolean remove(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash) { + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); + IntIntPair value = ledgerMap == null ? null : ledgerMap.get(entryId); + if (value == null || value.leftInt() != remainingUnacked || value.rightInt() != stickyKeyHash) { + return false; + } + ledgerMap.remove(entryId); + removeLedgerIfEmpty(ledgerId, ledgerMap); + return true; + } + + @Override + public int removeAndGetRemainingUnacked(long ledgerId, long entryId) { + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return PENDING_ACK_NOT_FOUND; + } + IntIntPair removed = ledgerMap.remove(entryId); + if (removed == null) { + return PENDING_ACK_NOT_FOUND; + } + removeLedgerIfEmpty(ledgerId, ledgerMap); + return removed.leftInt(); + } + + @Override + public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); + IntIntPair value = ledgerMap == null ? null : ledgerMap.get(entryId); + if (value == null) { + return false; + } + int newRemaining = value.leftInt() - ackedDelta; + if (newRemaining < 0) { + return false; + } + ledgerMap.put(entryId, IntIntPair.of(newRemaining, value.rightInt())); + return true; + } + + @Override + public long forEachScan() { + long sum = 0; + for (Long2ObjectMap.Entry> entry : pendingAcks.long2ObjectEntrySet()) { + long ledgerId = entry.getLongKey(); + for (Long2ObjectMap.Entry pendingAckEntry : entry.getValue().long2ObjectEntrySet()) { + IntIntPair value = pendingAckEntry.getValue(); + sum += ledgerId + pendingAckEntry.getLongKey() + value.leftInt() + value.rightInt(); + } + } + return sum; + } + + @Override + public long removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { + long removed = 0; + ObjectBidirectionalIterator>> ledgerIterator = + pendingAcks.headMap(markDeleteLedgerId + 1).long2ObjectEntrySet().iterator(); + while (ledgerIterator.hasNext()) { + Long2ObjectMap.Entry> entry = ledgerIterator.next(); + long ledgerId = entry.getLongKey(); + Long2ObjectSortedMap ledgerMap = entry.getValue(); + Long2ObjectSortedMap ledgerMapHead = + ledgerId == markDeleteLedgerId ? ledgerMap.headMap(markDeleteEntryId + 1) : ledgerMap; + ObjectBidirectionalIterator> entryIterator = + ledgerMapHead.long2ObjectEntrySet().iterator(); + while (entryIterator.hasNext()) { + entryIterator.next(); + entryIterator.remove(); + removed++; + } + if (ledgerMap.isEmpty()) { + ledgerIterator.remove(); + } + } + return removed; + } + + private void removeLedgerIfEmpty(long ledgerId, Long2ObjectSortedMap ledgerMap) { + if (ledgerMap.isEmpty()) { + pendingAcks.remove(ledgerId); + } + } + } + + private static final class FastutilPackedStore implements PendingAcksStore { + private final Long2ObjectSortedMap pendingAcks = new Long2ObjectRBTreeMap<>(); + private final boolean sentinelDefault; + + private FastutilPackedStore(boolean sentinelDefault) { + this.sentinelDefault = sentinelDefault; + } + + @Override + public boolean addOrReplace(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash) { + Long2LongSortedMap ledgerPendingAcks = pendingAcks.computeIfAbsent(ledgerId, k -> newLedgerPendingAcks()); + long packedValue = packPendingAckValue(remainingUnacked, stickyKeyHash); + ledgerPendingAcks.put(entryId, packedValue); + return true; + } + + @Override + public boolean contains(long ledgerId, long entryId) { + Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); + return ledgerMap != null && ledgerMap.containsKey(entryId); + } + + @Override + public IntIntPair get(long ledgerId, long entryId) { + Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return null; + } + if (sentinelDefault) { + long packedValue = ledgerMap.get(entryId); + return packedValue == PACKED_PENDING_ACK_NOT_FOUND ? null : unpackPendingAckValue(packedValue); + } + if (!ledgerMap.containsKey(entryId)) { + return null; + } + return unpackPendingAckValue(ledgerMap.get(entryId)); + } + + @Override + public int getRemainingUnacked(long ledgerId, long entryId) { + Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return PENDING_ACK_NOT_FOUND; + } + if (sentinelDefault) { + long packedValue = ledgerMap.get(entryId); + return packedValue == PACKED_PENDING_ACK_NOT_FOUND + ? PENDING_ACK_NOT_FOUND : unpackRemainingUnacked(packedValue); + } + if (!ledgerMap.containsKey(entryId)) { + return PENDING_ACK_NOT_FOUND; + } + return unpackRemainingUnacked(ledgerMap.get(entryId)); + } + + @Override + public boolean remove(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash) { + if (remainingUnacked < 0) { + return false; + } + Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return false; + } + long expectedValue = packPendingAckValue(remainingUnacked, stickyKeyHash); + if (sentinelDefault) { + if (ledgerMap.get(entryId) != expectedValue) { + return false; + } + } else if (!ledgerMap.containsKey(entryId) || ledgerMap.get(entryId) != expectedValue) { + return false; + } + ledgerMap.remove(entryId); + removeLedgerIfEmpty(ledgerId, ledgerMap); + return true; + } + + @Override + public int removeAndGetRemainingUnacked(long ledgerId, long entryId) { + Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return PENDING_ACK_NOT_FOUND; + } + if (sentinelDefault) { + long removed = ledgerMap.remove(entryId); + if (removed == PACKED_PENDING_ACK_NOT_FOUND) { + return PENDING_ACK_NOT_FOUND; + } + removeLedgerIfEmpty(ledgerId, ledgerMap); + return unpackRemainingUnacked(removed); + } + if (!ledgerMap.containsKey(entryId)) { + return PENDING_ACK_NOT_FOUND; + } + long removed = ledgerMap.remove(entryId); + removeLedgerIfEmpty(ledgerId, ledgerMap); + return unpackRemainingUnacked(removed); + } + + @Override + public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { + Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return false; + } + long packedValue; + if (sentinelDefault) { + packedValue = ledgerMap.get(entryId); + if (packedValue == PACKED_PENDING_ACK_NOT_FOUND) { + return false; + } + } else { + if (!ledgerMap.containsKey(entryId)) { + return false; + } + packedValue = ledgerMap.get(entryId); + } + int newRemaining = unpackRemainingUnacked(packedValue) - ackedDelta; + if (newRemaining < 0) { + return false; + } + ledgerMap.put(entryId, packPendingAckValue(newRemaining, unpackStickyKeyHash(packedValue))); + return true; + } + + @Override + public long forEachScan() { + long sum = 0; + for (Long2ObjectMap.Entry entry : pendingAcks.long2ObjectEntrySet()) { + long ledgerId = entry.getLongKey(); + for (Long2LongMap.Entry pendingAckEntry : entry.getValue().long2LongEntrySet()) { + long packedValue = pendingAckEntry.getLongValue(); + sum += ledgerId + pendingAckEntry.getLongKey() + + unpackRemainingUnacked(packedValue) + unpackStickyKeyHash(packedValue); + } + } + return sum; + } + + @Override + public long removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { + long removed = 0; + ObjectBidirectionalIterator> ledgerIterator = + pendingAcks.headMap(markDeleteLedgerId + 1).long2ObjectEntrySet().iterator(); + while (ledgerIterator.hasNext()) { + Long2ObjectMap.Entry entry = ledgerIterator.next(); + long ledgerId = entry.getLongKey(); + Long2LongSortedMap ledgerMap = entry.getValue(); + Long2LongSortedMap ledgerMapHead = + ledgerId == markDeleteLedgerId ? ledgerMap.headMap(markDeleteEntryId + 1) : ledgerMap; + ObjectBidirectionalIterator entryIterator = + ledgerMapHead.long2LongEntrySet().iterator(); + while (entryIterator.hasNext()) { + entryIterator.next(); + entryIterator.remove(); + removed++; + } + if (ledgerMap.isEmpty()) { + ledgerIterator.remove(); + } + } + return removed; + } + + private Long2LongSortedMap newLedgerPendingAcks() { + Long2LongRBTreeMap ledgerPendingAcks = new Long2LongRBTreeMap(); + if (sentinelDefault) { + ledgerPendingAcks.defaultReturnValue(PACKED_PENDING_ACK_NOT_FOUND); + } + return ledgerPendingAcks; + } + + private void removeLedgerIfEmpty(long ledgerId, Long2LongSortedMap ledgerMap) { + if (ledgerMap.isEmpty()) { + pendingAcks.remove(ledgerId); + } + } + } + + private static void populate(PendingAcksStore store, int entries, int ledgers, long[] ledgerIds, long[] entryIds) { + int index = 0; + for (int ledger = 0; ledger < ledgers; ledger++) { + int entriesInLedger = entries / ledgers + (ledger < entries % ledgers ? 1 : 0); + for (int entry = 0; entry < entriesInLedger; entry++) { + if (ledgerIds != null) { + ledgerIds[index] = ledger; + entryIds[index] = entry; + } + store.addOrReplace(ledger, entry, remainingUnacked(index), stickyKeyHash(index)); + index++; + } + } + } + + private static int remainingUnacked(int index) { + return 1_000_000_000 - (index & 1023); + } + + private static int stickyKeyHash(int index) { + return index * 0x9E3779B9; + } + + private static long packPendingAckValue(int remainingUnacked, int stickyKeyHash) { + if (remainingUnacked < 0) { + throw new IllegalArgumentException("remainingUnacked must be non-negative"); + } + return packPendingAckValueUnchecked(remainingUnacked, stickyKeyHash); + } + + private static long packPendingAckValueUnchecked(int remainingUnacked, int stickyKeyHash) { + return ((long) remainingUnacked << Integer.SIZE) | (stickyKeyHash & STICKY_KEY_HASH_MASK); + } + + private static IntIntPair unpackPendingAckValue(long packedValue) { + return IntIntPair.of(unpackRemainingUnacked(packedValue), unpackStickyKeyHash(packedValue)); + } + + private static int unpackRemainingUnacked(long packedValue) { + return (int) (packedValue >> Integer.SIZE); + } + + private static int unpackStickyKeyHash(long packedValue) { + return (int) packedValue; + } +} diff --git a/microbench/src/main/java/org/apache/pulsar/broker/service/package-info.java b/microbench/src/main/java/org/apache/pulsar/broker/service/package-info.java new file mode 100644 index 0000000000000..ee8034bd6ed1a --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/service/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Benchmarks for broker service internals. + */ +package org.apache.pulsar.broker.service; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 7b643795580c1..50c2dd67a0922 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -1214,7 +1214,11 @@ public int removePendingAckAndGetRemainingUnacked(long ledgerId, long entryId) { * * @return the removed {@link IntIntPair#leftInt() remainingUnacked} and * {@link IntIntPair#rightInt() stickyKeyHash}, or {@code null} if not found + * @deprecated use {@link #removePendingAckAndGetRemainingUnacked(long, long)} when only the remaining unacked + * count is needed. This method materializes an {@link IntIntPair}. */ + @Deprecated + @SuppressWarnings("deprecation") public IntIntPair removePendingAckAndGet(long ledgerId, long entryId) { if (pendingAcks != null) { return pendingAcks.removeAndGet(ledgerId, entryId); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 319ff7197c16a..75e3e61abde9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -45,6 +45,10 @@ public class PendingAcksMap { static final int PENDING_ACK_NOT_FOUND = -1; private static final long STICKY_KEY_HASH_MASK = 0xFFFF_FFFFL; + // The remaining unacked count is a non-negative message count. Reserve a packed negative count as the + // Long2LongSortedMap default return value so lookups can distinguish missing entries with a single get. + private static final long PACKED_PENDING_ACK_NOT_FOUND = + packPendingAckValueUnchecked(PENDING_ACK_NOT_FOUND, 0); /** * Callback interface for handling the addition of pending acknowledgments. @@ -137,6 +141,7 @@ public interface PendingAcksConsumer { * @return true if the pending ack was added, and it's allowed to send a message, false otherwise */ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash) { + long packedValue = packPendingAckValue(remainingUnacked, stickyKeyHash); try { writeLock.lock(); // prevent adding sticky hash to pending acks if the PendingAcksMap has already been closed @@ -152,10 +157,9 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining return false; } Long2LongSortedMap ledgerPendingAcks = - pendingAcks.computeIfAbsent(ledgerId, k -> new Long2LongRBTreeMap()); - boolean containsEntry = ledgerPendingAcks.containsKey(entryId); - ledgerPendingAcks.put(entryId, packPendingAckValue(remainingUnacked, stickyKeyHash)); - if (!containsEntry) { + pendingAcks.computeIfAbsent(ledgerId, k -> newLedgerPendingAcks()); + long previous = ledgerPendingAcks.put(entryId, packedValue); + if (previous == PACKED_PENDING_ACK_NOT_FOUND) { size++; } return true; @@ -278,15 +282,19 @@ public boolean contains(long ledgerId, long entryId) { * @param ledgerId the ledger ID * @param entryId the entry ID * @return the pending ack, or null if not found + * @deprecated use {@link #getRemainingUnacked(long, long)} when only the remaining unacked count is needed. + * This method materializes an {@link IntIntPair}. */ + @Deprecated public IntIntPair get(long ledgerId, long entryId) { try { readLock.lock(); Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); - if (ledgerMap == null || !ledgerMap.containsKey(entryId)) { + if (ledgerMap == null) { return null; } - return unpackPendingAckValue(ledgerMap.get(entryId)); + long packedValue = ledgerMap.get(entryId); + return packedValue == PACKED_PENDING_ACK_NOT_FOUND ? null : unpackPendingAckValue(packedValue); } finally { readLock.unlock(); } @@ -301,10 +309,12 @@ int getRemainingUnacked(long ledgerId, long entryId) { try { readLock.lock(); Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); - if (ledgerMap == null || !ledgerMap.containsKey(entryId)) { + if (ledgerMap == null) { return PENDING_ACK_NOT_FOUND; } - return unpackRemainingUnacked(ledgerMap.get(entryId)); + long packedValue = ledgerMap.get(entryId); + return packedValue == PACKED_PENDING_ACK_NOT_FOUND + ? PENDING_ACK_NOT_FOUND : unpackRemainingUnacked(packedValue); } finally { readLock.unlock(); } @@ -323,8 +333,11 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH try { writeLock.lock(); Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); + if (batchSize < 0) { + return false; + } long expectedValue = packPendingAckValue(batchSize, stickyKeyHash); - if (ledgerMap == null || !ledgerMap.containsKey(entryId) || ledgerMap.get(entryId) != expectedValue) { + if (ledgerMap == null || ledgerMap.get(entryId) != expectedValue) { return false; } ledgerMap.remove(entryId); @@ -352,11 +365,17 @@ public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelt try { writeLock.lock(); Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); - if (ledgerMap == null || !ledgerMap.containsKey(entryId)) { + if (ledgerMap == null) { return false; } long packedValue = ledgerMap.get(entryId); + if (packedValue == PACKED_PENDING_ACK_NOT_FOUND) { + return false; + } int newRemaining = unpackRemainingUnacked(packedValue) - ackedDelta; + if (newRemaining < 0) { + return false; + } ledgerMap.put(entryId, packPendingAckValue(newRemaining, unpackStickyKeyHash(packedValue))); return true; } finally { @@ -375,10 +394,13 @@ public boolean remove(long ledgerId, long entryId) { try { writeLock.lock(); Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); - if (ledgerMap == null || !ledgerMap.containsKey(entryId)) { + if (ledgerMap == null) { return false; } long removedEntry = ledgerMap.remove(entryId); + if (removedEntry == PACKED_PENDING_ACK_NOT_FOUND) { + return false; + } size--; handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); if (ledgerMap.isEmpty()) { @@ -398,15 +420,21 @@ public boolean remove(long ledgerId, long entryId) { * @param ledgerId the ledger ID * @param entryId the entry ID * @return the removed entry as an IntIntPair (batchSize, stickyKeyHash), or null if not found + * @deprecated use {@link #removeAndGetRemainingUnacked(long, long)} when only the remaining unacked count + * is needed. This method materializes an {@link IntIntPair}. */ + @Deprecated public IntIntPair removeAndGet(long ledgerId, long entryId) { try { writeLock.lock(); Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); - if (ledgerMap == null || !ledgerMap.containsKey(entryId)) { + if (ledgerMap == null) { return null; } long removedEntry = ledgerMap.remove(entryId); + if (removedEntry == PACKED_PENDING_ACK_NOT_FOUND) { + return null; + } size--; handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); if (ledgerMap.isEmpty()) { @@ -427,10 +455,13 @@ int removeAndGetRemainingUnacked(long ledgerId, long entryId) { try { writeLock.lock(); Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); - if (ledgerMap == null || !ledgerMap.containsKey(entryId)) { + if (ledgerMap == null) { return PENDING_ACK_NOT_FOUND; } long removedEntry = ledgerMap.remove(entryId); + if (removedEntry == PACKED_PENDING_ACK_NOT_FOUND) { + return PENDING_ACK_NOT_FOUND; + } size--; handleRemovePendingAck(ledgerId, entryId, unpackStickyKeyHash(removedEntry)); if (ledgerMap.isEmpty()) { @@ -544,9 +575,22 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } private static long packPendingAckValue(int remainingUnacked, int stickyKeyHash) { + if (remainingUnacked < 0) { + throw new IllegalArgumentException("remainingUnacked must be non-negative"); + } + return packPendingAckValueUnchecked(remainingUnacked, stickyKeyHash); + } + + private static long packPendingAckValueUnchecked(int remainingUnacked, int stickyKeyHash) { return ((long) remainingUnacked << Integer.SIZE) | (stickyKeyHash & STICKY_KEY_HASH_MASK); } + private static Long2LongSortedMap newLedgerPendingAcks() { + Long2LongRBTreeMap ledgerPendingAcks = new Long2LongRBTreeMap(); + ledgerPendingAcks.defaultReturnValue(PACKED_PENDING_ACK_NOT_FOUND); + return ledgerPendingAcks; + } + private static IntIntPair unpackPendingAckValue(long packedValue) { return IntIntPair.of(unpackRemainingUnacked(packedValue), unpackStickyKeyHash(packedValue)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 42fd5f7c7207c..edf9758188056 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -29,11 +29,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import it.unimi.dsi.fastutil.ints.IntIntPair; import java.util.ArrayList; import java.util.List; import org.testng.annotations.Test; +@SuppressWarnings("deprecation") public class PendingAcksMapTest { @Test public void addPendingAckIfAllowed_AddsAckWhenAllowed() { @@ -369,7 +371,7 @@ public void packedPendingAckFields_RoundTripThroughPublicAccessors() { {1, 123}, {0, 0}, {Integer.MAX_VALUE, Integer.MIN_VALUE}, - {Integer.MIN_VALUE, Integer.MAX_VALUE}, + {2, Integer.MAX_VALUE}, {42, -123456789} }; @@ -405,6 +407,17 @@ public void updateRemainingUnacked_PreservesStickyKeyHash() { assertPendingAck(pendingAcksMap.get(1L, 1L), 6, Integer.MIN_VALUE); } + @Test + public void updateRemainingUnacked_DoesNotStoreNegativeRemainingUnacked() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123); + + assertFalse(pendingAcksMap.updateRemainingUnacked(1L, 1L, 2)); + + assertPendingAck(pendingAcksMap.get(1L, 1L), 1, 123); + } + @Test public void removeAllUpTo_PreservesPackedFieldsInCallback() { Consumer consumer = createMockConsumer("consumer1"); @@ -436,6 +449,27 @@ public void removeWithValue_MatchesSignedPackedFields() { assertFalse(pendingAcksMap.contains(1L, 1L)); } + @Test + public void addPendingAckIfAllowed_RejectsNegativeRemainingUnacked() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + + expectThrows(IllegalArgumentException.class, + () -> pendingAcksMap.addPendingAckIfAllowed(1L, 1L, PendingAcksMap.PENDING_ACK_NOT_FOUND, 0)); + } + + @Test + public void removeWithValue_RejectsNegativeRemainingUnackedWithoutRemovingEntry() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 0); + + assertFalse(pendingAcksMap.remove(1L, 1L, PendingAcksMap.PENDING_ACK_NOT_FOUND, 0)); + + assertTrue(pendingAcksMap.contains(1L, 1L)); + assertEquals(pendingAcksMap.getRemainingUnacked(1L, 1L), 1); + } + private static void assertPendingAck(IntIntPair pendingAck, int remainingUnacked, int stickyKeyHash) { assertTrue(pendingAck != null); assertEquals(pendingAck.leftInt(), remainingUnacked); From c5db155f945238fa071d286af979f58ce391d170 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Wed, 17 Jun 2026 12:48:13 +0800 Subject: [PATCH 3/4] [improve][broker] Cover pending ack sentinel boundaries --- .../broker/service/PendingAcksMapTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index edf9758188056..5b8cbc75e3254 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -360,6 +361,7 @@ public void removeAndGetRemainingUnacked_RemovesAndInvokesRemoveHandler() { verify(removeHandler).handleRemoving(consumer, 1L, 1L, Integer.MIN_VALUE, false); verify(removeHandler).handleRemoving(consumer, 1L, 2L, -1, false); + verifyNoMoreInteractions(removeHandler); assertEquals(pendingAcksMap.size(), 0); } @@ -407,6 +409,19 @@ public void updateRemainingUnacked_PreservesStickyKeyHash() { assertPendingAck(pendingAcksMap.get(1L, 1L), 6, Integer.MIN_VALUE); } + @Test + public void updateRemainingUnacked_AllowsZeroRemainingUnacked() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, -1); + + assertTrue(pendingAcksMap.updateRemainingUnacked(1L, 1L, 1)); + + assertEquals(pendingAcksMap.getRemainingUnacked(1L, 1L), 0); + assertPendingAck(pendingAcksMap.get(1L, 1L), 0, -1); + assertEquals(pendingAcksMap.size(), 1); + } + @Test public void updateRemainingUnacked_DoesNotStoreNegativeRemainingUnacked() { Consumer consumer = createMockConsumer("consumer1"); @@ -456,6 +471,9 @@ public void addPendingAckIfAllowed_RejectsNegativeRemainingUnacked() { expectThrows(IllegalArgumentException.class, () -> pendingAcksMap.addPendingAckIfAllowed(1L, 1L, PendingAcksMap.PENDING_ACK_NOT_FOUND, 0)); + + assertFalse(pendingAcksMap.contains(1L, 1L)); + assertEquals(pendingAcksMap.size(), 0); } @Test From 31e1d3904967c831149f450a72af1f27e95d55a4 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Wed, 17 Jun 2026 13:59:38 +0800 Subject: [PATCH 4/4] [improve][broker] Keep pending ack packing behavior unchanged --- .../pulsar/broker/service/PendingAcksMap.java | 12 +-------- .../broker/service/PendingAcksMapTest.java | 25 ------------------- 2 files changed, 1 insertion(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 75e3e61abde9b..fee9a1348057e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -48,7 +48,7 @@ public class PendingAcksMap { // The remaining unacked count is a non-negative message count. Reserve a packed negative count as the // Long2LongSortedMap default return value so lookups can distinguish missing entries with a single get. private static final long PACKED_PENDING_ACK_NOT_FOUND = - packPendingAckValueUnchecked(PENDING_ACK_NOT_FOUND, 0); + packPendingAckValue(PENDING_ACK_NOT_FOUND, 0); /** * Callback interface for handling the addition of pending acknowledgments. @@ -333,9 +333,6 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH try { writeLock.lock(); Long2LongSortedMap ledgerMap = pendingAcks.get(ledgerId); - if (batchSize < 0) { - return false; - } long expectedValue = packPendingAckValue(batchSize, stickyKeyHash); if (ledgerMap == null || ledgerMap.get(entryId) != expectedValue) { return false; @@ -575,13 +572,6 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } private static long packPendingAckValue(int remainingUnacked, int stickyKeyHash) { - if (remainingUnacked < 0) { - throw new IllegalArgumentException("remainingUnacked must be non-negative"); - } - return packPendingAckValueUnchecked(remainingUnacked, stickyKeyHash); - } - - private static long packPendingAckValueUnchecked(int remainingUnacked, int stickyKeyHash) { return ((long) remainingUnacked << Integer.SIZE) | (stickyKeyHash & STICKY_KEY_HASH_MASK); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 5b8cbc75e3254..311937aebf4a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -30,7 +30,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.expectThrows; import it.unimi.dsi.fastutil.ints.IntIntPair; import java.util.ArrayList; import java.util.List; @@ -464,30 +463,6 @@ public void removeWithValue_MatchesSignedPackedFields() { assertFalse(pendingAcksMap.contains(1L, 1L)); } - @Test - public void addPendingAckIfAllowed_RejectsNegativeRemainingUnacked() { - Consumer consumer = createMockConsumer("consumer1"); - PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); - - expectThrows(IllegalArgumentException.class, - () -> pendingAcksMap.addPendingAckIfAllowed(1L, 1L, PendingAcksMap.PENDING_ACK_NOT_FOUND, 0)); - - assertFalse(pendingAcksMap.contains(1L, 1L)); - assertEquals(pendingAcksMap.size(), 0); - } - - @Test - public void removeWithValue_RejectsNegativeRemainingUnackedWithoutRemovingEntry() { - Consumer consumer = createMockConsumer("consumer1"); - PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); - pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 0); - - assertFalse(pendingAcksMap.remove(1L, 1L, PendingAcksMap.PENDING_ACK_NOT_FOUND, 0)); - - assertTrue(pendingAcksMap.contains(1L, 1L)); - assertEquals(pendingAcksMap.getRemainingUnacked(1L, 1L), 1); - } - private static void assertPendingAck(IntIntPair pendingAck, int remainingUnacked, int stickyKeyHash) { assertTrue(pendingAck != null); assertEquals(pendingAck.leftInt(), remainingUnacked);