From fa30d367273b47914473c3b8a8c1cc2703955bcd Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 15:56:50 +0300 Subject: [PATCH 01/10] [cleanup][misc] Restore fastutil, remove custom primitive collections (revert #25413) ### Motivation Reverts #25413, which had replaced fastutil with hand-written primitive collections (Int2ObjectOpenHashMap, IntIntPair, Long2ObjectOpenHashMap, ...). Those custom collections are a maintenance overhead for the project and have been a source of bugs in the past, so the broker and client code goes back to using fastutil. ### Modifications - Restore fastutil usage in NegativeAcksTracker, PendingAcksMap, Consumer, DrainingHashesTracker, InMemoryRedeliveryTracker, InMemoryDelayedDeliveryTracker and PersistentStickyKeyDispatcherMultipleConsumers (and their tests). - Keep the post-#25413 improvements: Roaring bitmap usage, the delayed-delivery race fixes and the PendingAcksMap O(1) size optimization (#26019); only the collection types are swapped back, preferring fastutil primitive sorted maps (Long2ObjectRBTreeMap/AVLTreeMap) over java.util.TreeMap. - Delete the custom org.apache.pulsar.common.util.collections.* classes and tests. - Re-add the fastutil dependency to pulsar-broker and pulsar-client, the version catalog, and the binary LICENSE files. Assisted-by: Claude Code --- .../server/src/assemble/LICENSE.bin.txt | 1 + .../shell/src/assemble/LICENSE.bin.txt | 1 + gradle/libs.versions.toml | 2 + pulsar-broker/build.gradle.kts | 1 + .../InMemoryDelayedDeliveryTracker.java | 30 +- .../pulsar/broker/service/Consumer.java | 4 +- .../broker/service/DrainingHashesTracker.java | 2 +- .../service/InMemoryRedeliveryTracker.java | 8 +- .../pulsar/broker/service/PendingAcksMap.java | 65 ++-- ...tStickyKeyDispatcherMultipleConsumers.java | 2 +- .../delayed/InMemoryDeliveryTrackerTest.java | 2 +- .../broker/service/PendingAcksMapTest.java | 2 +- pulsar-client/build.gradle.kts | 1 + .../client/impl/NegativeAcksTracker.java | 27 +- .../collections/Int2ObjectOpenHashMap.java | 193 ------------ .../common/util/collections/IntIntPair.java | 28 -- .../util/collections/IntOpenHashSet.java | 123 -------- .../common/util/collections/Long2IntMap.java | 83 ------ .../util/collections/Long2IntOpenHashMap.java | 190 ------------ .../util/collections/Long2ObjectMap.java | 98 ------- .../collections/Long2ObjectOpenHashMap.java | 277 ------------------ .../util/collections/LongObjConsumer.java | 27 -- .../util/collections/LongOpenHashSet.java | 159 ---------- .../util/collections/ObjectIntPair.java | 28 -- .../common/util/collections/HashSetTest.java | 105 ------- .../Int2ObjectOpenHashMapTest.java | 88 ------ .../collections/Long2IntOpenHashMapTest.java | 100 ------- .../Long2ObjectOpenHashMapTest.java | 179 ----------- 28 files changed, 79 insertions(+), 1747 deletions(-) delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java delete mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java delete mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java delete mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java delete mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index b330e1cea86e6..07c779abf65ab 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -262,6 +262,7 @@ The Apache Software License, Version 2.0 - com.fasterxml.jackson.module-jackson-module-parameter-names-2.21.3.jar * Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.4.jar * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar + * Fastutil -- it.unimi.dsi-fastutil-8.5.18.jar * LMAX Disruptor -- com.lmax-disruptor-4.0.0.jar * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.63.2.jar * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.6.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 115851264bfdb..4f000e2c92a8b 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -427,6 +427,7 @@ The Apache Software License, Version 2.0 * RE2j -- re2j-1.8.jar * Spotify completable-futures -- completable-futures-0.3.6.jar * RoaringBitmap -- RoaringBitmap-1.6.9.jar + * Fastutil -- fastutil-8.5.18.jar * JSpecify -- jspecify-1.0.0.jar * JetBrains Annotations -- annotations-26.1.0.jar diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e0c801ed65452..544289017647a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -80,6 +80,7 @@ opentelemetry-gcp-resources = "1.57.0-alpha" # Data structures / Utils guava = "33.6.0-jre" caffeine = "3.2.4" +fastutil = "8.5.18" jctools = "4.0.6" roaringbitmap = "1.6.9" hppc = "0.9.1" @@ -330,6 +331,7 @@ rocksdbjni = { module = "org.rocksdb:rocksdbjni", version.ref = "rocksdb" } error-prone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone" } # Data structures caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "caffeine" } +fastutil = { module = "it.unimi.dsi:fastutil", version.ref = "fastutil" } jctools-core = { module = "org.jctools:jctools-core", version.ref = "jctools" } jctools-core-jdk11 = { module = "org.jctools:jctools-core-jdk11", version.ref = "jctools" } roaringbitmap = { module = "org.roaringbitmap:RoaringBitmap", version.ref = "roaringbitmap" } diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts index 6fd3778c0c388..c177133647880 100644 --- a/pulsar-broker/build.gradle.kts +++ b/pulsar-broker/build.gradle.kts @@ -49,6 +49,7 @@ dependencies { api(libs.commons.lang3) api(libs.netty.transport) implementation(libs.protobuf.java) + implementation(libs.fastutil) implementation(libs.curator.recipes) implementation(libs.bookkeeper.stream.storage.server) { exclude(group = "org.apache.bookkeeper") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 8e9b8343704ea..c2e1c63b40020 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -21,9 +21,14 @@ import com.google.common.annotations.VisibleForTesting; import io.github.merlimat.slog.Logger; import io.netty.util.Timer; +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; import java.time.Clock; import java.util.NavigableSet; -import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -31,7 +36,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; -import org.apache.pulsar.common.util.collections.LongOpenHashSet; import org.roaringbitmap.longlong.Roaring64Bitmap; public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { @@ -40,9 +44,9 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack protected final Logger log; // timestamp -> ledgerId -> entryId - // TreeMap -> TreeMap -> RoaringBitmap - protected final TreeMap> - delayedMessageMap = new TreeMap<>(); + // AVL tree -> OpenHashMap -> RoaringBitmap + protected final Long2ObjectSortedMap> + delayedMessageMap = new Long2ObjectAVLTreeMap<>(); // If we detect that all messages have fixed delay time, such that the delivery is // always going to be in FIFO order, then we can avoid pulling all the messages in @@ -137,7 +141,7 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { .log("Add message"); long timestamp = roundTimestamp(deliverAt); - Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>()) + Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); // Roaring64Bitmap does not store duplicates, so track if it a new element // so we can keep delayedMessagesCount in sync @@ -194,7 +198,7 @@ private void checkAndUpdateHighest(long deliverAt) { @Override public boolean hasMessageAvailable() { boolean hasMessageAvailable = !delayedMessageMap.isEmpty() - && delayedMessageMap.firstKey() <= getCutoffTime(); + && delayedMessageMap.firstLongKey() <= getCutoffTime(); if (!hasMessageAvailable) { updateTimer(); } @@ -211,15 +215,15 @@ public NavigableSet getScheduledMessages(int maxMessages) { long cutoffTime = getCutoffTime(); while (n > 0 && !delayedMessageMap.isEmpty()) { - long timestamp = delayedMessageMap.firstKey(); + long timestamp = delayedMessageMap.firstLongKey(); if (timestamp > cutoffTime) { break; } - LongOpenHashSet ledgerIdToDelete = new LongOpenHashSet(); - TreeMap ledgerMap = delayedMessageMap.get(timestamp); - for (var ledgerEntry : ledgerMap.entrySet()) { - long ledgerId = ledgerEntry.getKey(); + LongSet ledgerIdToDelete = new LongOpenHashSet(); + Long2ObjectSortedMap ledgerMap = delayedMessageMap.get(timestamp); + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); Roaring64Bitmap entryIds = ledgerEntry.getValue(); long cardinality = entryIds.getLongCardinality(); if (cardinality <= n) { @@ -309,6 +313,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead } protected long nextDeliveryTime() { - return delayedMessageMap.firstKey(); + return delayedMessageMap.firstLongKey(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0f4708fa39fe7..0f8f313c6a298 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -28,6 +28,8 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.opentelemetry.api.common.Attributes; +import it.unimi.dsi.fastutil.ints.IntIntPair; +import it.unimi.dsi.fastutil.objects.ObjectIntPair; import java.time.Instant; import java.util.ArrayList; import java.util.BitSet; @@ -72,8 +74,6 @@ import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.IntIntPair; -import org.apache.pulsar.common.util.collections.ObjectIntPair; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.common.exception.TransactionConflictException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 9edc88f88cf9e..5393cf4d6a6a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,7 +33,6 @@ import org.apache.pulsar.common.policies.data.DrainingHash; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl; -import org.apache.pulsar.common.util.collections.Int2ObjectOpenHashMap; import org.roaringbitmap.RoaringBitmap; /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java index 0d684eb17361b..669562055214c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java @@ -18,13 +18,13 @@ */ package org.apache.pulsar.broker.service; +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import java.util.List; import java.util.concurrent.locks.StampedLock; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.common.util.collections.Long2IntMap; -import org.apache.pulsar.common.util.collections.Long2IntOpenHashMap; -import org.apache.pulsar.common.util.collections.Long2ObjectMap; -import org.apache.pulsar.common.util.collections.Long2ObjectOpenHashMap; public class InMemoryRedeliveryTracker implements RedeliveryTracker { // ledgerId -> entryId -> count diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 48d912d174c9e..9eb43efbd46db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -18,14 +18,15 @@ */ package org.apache.pulsar.broker.service; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; +import it.unimi.dsi.fastutil.ints.IntIntPair; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.objects.ObjectBidirectionalIterator; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; -import org.apache.pulsar.common.util.collections.IntIntPair; /** * A thread-safe map to store pending acks in the consumer. @@ -97,7 +98,7 @@ public interface PendingAcksConsumer { } private final Consumer consumer; - private final TreeMap> pendingAcks; + private final Long2ObjectSortedMap> pendingAcks; private final Supplier pendingAcksAddHandlerSupplier; private final Supplier pendingAcksRemoveHandlerSupplier; private final Lock readLock; @@ -108,7 +109,7 @@ public interface PendingAcksConsumer { PendingAcksMap(Consumer consumer, Supplier pendingAcksAddHandlerSupplier, Supplier pendingAcksRemoveHandlerSupplier) { this.consumer = consumer; - this.pendingAcks = new TreeMap<>(); + this.pendingAcks = new Long2ObjectRBTreeMap<>(); this.pendingAcksAddHandlerSupplier = pendingAcksAddHandlerSupplier; this.pendingAcksRemoveHandlerSupplier = pendingAcksRemoveHandlerSupplier; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -144,8 +145,8 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining && !pendingAcksAddHandler.handleAdding(consumer, ledgerId, entryId, stickyKeyHash)) { return false; } - TreeMap ledgerPendingAcks = - pendingAcks.computeIfAbsent(ledgerId, k -> new TreeMap<>()); + Long2ObjectSortedMap ledgerPendingAcks = + pendingAcks.computeIfAbsent(ledgerId, k -> new Long2ObjectRBTreeMap<>()); IntIntPair previous = ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, stickyKeyHash)); if (previous == null) { size++; @@ -183,12 +184,12 @@ public void forEach(PendingAcksConsumer processor) { private void processPendingAcks(PendingAcksConsumer processor) { // this code uses for loops intentionally, don't refactor to use forEach // iterate the outer map - for (Map.Entry> entry : pendingAcks.entrySet()) { - long ledgerId = entry.getKey(); - TreeMap ledgerPendingAcks = entry.getValue(); + for (Long2ObjectMap.Entry> entry : pendingAcks.long2ObjectEntrySet()) { + long ledgerId = entry.getLongKey(); + Long2ObjectSortedMap ledgerPendingAcks = entry.getValue(); // iterate the inner map - for (Map.Entry e : ledgerPendingAcks.entrySet()) { - long entryId = e.getKey(); + for (Long2ObjectMap.Entry e : ledgerPendingAcks.long2ObjectEntrySet()) { + long entryId = e.getLongKey(); IntIntPair batchSizeAndStickyKeyHash = e.getValue(); processor.accept(ledgerId, entryId, batchSizeAndStickyKeyHash.leftInt(), batchSizeAndStickyKeyHash.rightInt()); @@ -254,7 +255,7 @@ private void internalForEachAndClear(PendingAcksConsumer processor, boolean clos public boolean contains(long ledgerId, long entryId) { try { readLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -274,7 +275,7 @@ public boolean contains(long ledgerId, long entryId) { public IntIntPair get(long ledgerId, long entryId) { try { readLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } @@ -296,7 +297,7 @@ public IntIntPair get(long ledgerId, long entryId) { public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyHash) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -326,7 +327,7 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -352,7 +353,7 @@ public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelt public boolean remove(long ledgerId, long entryId) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -384,7 +385,7 @@ public boolean remove(long ledgerId, long entryId) { public IntIntPair removeAndGet(long ledgerId, long entryId) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } @@ -443,23 +444,23 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } else { readLock.lock(); } - Iterator>> ledgerMapIterator = - pendingAcks.headMap(markDeleteLedgerId + 1).entrySet().iterator(); + ObjectBidirectionalIterator>> ledgerMapIterator = + pendingAcks.headMap(markDeleteLedgerId + 1).long2ObjectEntrySet().iterator(); while (ledgerMapIterator.hasNext()) { - Map.Entry> entry = ledgerMapIterator.next(); - long ledgerId = entry.getKey(); - TreeMap ledgerMap = entry.getValue(); - TreeMap ledgerMapHead; + Long2ObjectMap.Entry> entry = ledgerMapIterator.next(); + long ledgerId = entry.getLongKey(); + Long2ObjectSortedMap ledgerMap = entry.getValue(); + Long2ObjectSortedMap ledgerMapHead; if (ledgerId == markDeleteLedgerId) { - ledgerMapHead = new TreeMap<>(ledgerMap.headMap(markDeleteEntryId + 1)); + ledgerMapHead = ledgerMap.headMap(markDeleteEntryId + 1); } else { ledgerMapHead = ledgerMap; } - Iterator> entryMapIterator = - ledgerMapHead.entrySet().iterator(); + ObjectBidirectionalIterator> entryMapIterator = + ledgerMapHead.long2ObjectEntrySet().iterator(); while (entryMapIterator.hasNext()) { - Map.Entry intIntPairEntry = entryMapIterator.next(); - long entryId = intIntPairEntry.getKey(); + Long2ObjectMap.Entry intIntPairEntry = entryMapIterator.next(); + long entryId = intIntPairEntry.getLongKey(); if (!acquiredWriteLock) { retryWithWriteLock = true; return; @@ -478,10 +479,6 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry removedEntryCallback.accept(ledgerId, entryId, batchSize, stickyKeyHash); } entryMapIterator.remove(); - // also remove from the original map if we're iterating a copy - if (ledgerId == markDeleteLedgerId) { - ledgerMap.remove(entryId); - } size--; } if (ledgerMap.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 02055b28f3b53..7b7b6596f0ca2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; import com.google.common.annotations.VisibleForTesting; import io.github.merlimat.slog.Logger; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -58,7 +59,6 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.IntOpenHashSet; public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers implements StickyKeyDispatcher { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index f8a9dd8304294..1c16ed4228f33 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -272,7 +272,7 @@ public void run(Timeout timeout) throws Exception { return; } try { - this.delayedMessageMap.firstKey(); + this.delayedMessageMap.firstLongKey(); } catch (Exception e) { e.printStackTrace(); exceptions[0] = e; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 8c033d4f731b2..c13babb126a30 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -29,9 +29,9 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import it.unimi.dsi.fastutil.ints.IntIntPair; import java.util.ArrayList; import java.util.List; -import org.apache.pulsar.common.util.collections.IntIntPair; import org.testng.annotations.Test; public class PendingAcksMapTest { diff --git a/pulsar-client/build.gradle.kts b/pulsar-client/build.gradle.kts index c6c623a9b82de..da53aeba55d5a 100644 --- a/pulsar-client/build.gradle.kts +++ b/pulsar-client/build.gradle.kts @@ -61,6 +61,7 @@ dependencies { implementation(libs.jsr305) api(libs.jspecify) implementation(libs.roaringbitmap) + implementation(libs.fastutil) compileOnly(libs.swagger.annotations) compileOnly(libs.protobuf.java) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 7bbf58a13672d..1d42d947bdd56 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -23,11 +23,14 @@ import io.netty.util.Timeout; import io.netty.util.Timer; import io.opentelemetry.api.trace.Span; +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator; import java.io.Closeable; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.TimeUnit; import lombok.CustomLog; import org.apache.pulsar.client.api.Message; @@ -36,8 +39,6 @@ import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.api.TraceableMessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.util.collections.Long2ObjectMap; -import org.apache.pulsar.common.util.collections.Long2ObjectOpenHashMap; import org.roaringbitmap.longlong.Roaring64Bitmap; @CustomLog @@ -45,8 +46,8 @@ class NegativeAcksTracker implements Closeable { // timestamp -> ledgerId -> entryId, no need to batch index, if different messages have // different timestamp, there will be multiple entries in the map - // TreeMap -> LongOpenHashMap -> Roaring64Bitmap - private TreeMap> nackedMessages = null; + // RB Tree -> LongOpenHashMap -> Roaring64Bitmap + private Long2ObjectSortedMap> nackedMessages = null; private final Long2ObjectMap> nackedMessageIds = new Long2ObjectOpenHashMap<>(); private final ConsumerBase consumer; @@ -86,7 +87,9 @@ private void triggerRedelivery(Timeout t) { } Long2ObjectMap ledgerMap = nackedMessages.get(timestamp); - ledgerMap.forEach((ledgerId, entrySet) -> { + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entrySet = ledgerEntry.getValue(); entrySet.forEach(entryId -> { MessageId msgId = null; Long2ObjectMap entryMap = nackedMessageIds.get(ledgerId); @@ -102,13 +105,13 @@ private void triggerRedelivery(Timeout t) { addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); messagesToRedeliver.add(msgId); }); - }); + } } // remove entries from the nackedMessages map - Iterator iterator = nackedMessages.keySet().iterator(); + LongBidirectionalIterator iterator = nackedMessages.keySet().iterator(); while (iterator.hasNext()) { - long timestamp = iterator.next(); + long timestamp = iterator.nextLong(); if (timestamp <= currentTimestamp) { iterator.remove(); } else { @@ -118,7 +121,7 @@ private void triggerRedelivery(Timeout t) { // Schedule the next redelivery if there are still messages to redeliver if (!nackedMessages.isEmpty()) { - long nextTriggerTimestamp = nackedMessages.firstKey(); + long nextTriggerTimestamp = nackedMessages.firstLongKey(); long delayMs = Math.max(nextTriggerTimestamp - currentTimestamp, 0); if (delayMs > 0) { this.timeout = timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS); @@ -164,7 +167,7 @@ private synchronized void add(MessageId messageId, int redeliveryCount) { } if (nackedMessages == null) { - nackedMessages = new TreeMap<>(); + nackedMessages = new Long2ObjectAVLTreeMap<>(); } long backoffMs; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java deleted file mode 100644 index ada501319d7dc..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * Open-addressing hash map with primitive int keys and object values. - * Uses linear probing and fibonacci hashing. - * Not thread-safe. - */ -@SuppressWarnings("unchecked") -public class Int2ObjectOpenHashMap { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private int[] keys; - private Object[] values; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - public Int2ObjectOpenHashMap() { - this(MIN_CAPACITY); - } - - public Int2ObjectOpenHashMap(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new int[cap]; - values = new Object[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - public V get(int key) { - int idx = indexOf(key); - return idx >= 0 ? (V) values[idx] : null; - } - - public V put(int key, V value) { - int idx = indexOf(key); - if (idx >= 0) { - V old = (V) values[idx]; - values[idx] = value; - return old; - } - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - return null; - } - - public V remove(int key) { - int idx = indexOf(key); - if (idx < 0) { - return null; - } - V old = (V) values[idx]; - removeAt(idx); - return old; - } - - /** - * Remove the entry only if it maps to the given value (by reference equality). - * - * @return true if the entry was removed - */ - public boolean remove(int key, Object value) { - int idx = indexOf(key); - if (idx < 0 || values[idx] != value) { - return false; - } - removeAt(idx); - return true; - } - - public boolean isEmpty() { - return size == 0; - } - - public int size() { - return size; - } - - public void clear() { - if (size > 0) { - java.util.Arrays.fill(used, false); - java.util.Arrays.fill(values, null); - size = 0; - } - } - - private int indexOf(int key) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (true) { - if (!used[idx]) { - return -1; - } - if (keys[idx] == key) { - return idx; - } - idx = (idx + 1) & mask; - } - } - - private void insertNew(int key, V value) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (used[idx]) { - idx = (idx + 1) & mask; - } - keys[idx] = key; - values[idx] = value; - used[idx] = true; - size++; - } - - private void removeAt(int idx) { - int mask = capacity - 1; - values[idx] = null; - size--; - int next = (idx + 1) & mask; - while (used[next]) { - int naturalSlot = hash(keys[next]) & mask; - if ((next > idx && (naturalSlot <= idx || naturalSlot > next)) - || (next < idx && (naturalSlot <= idx && naturalSlot > next))) { - keys[idx] = keys[next]; - values[idx] = values[next]; - values[next] = null; - idx = next; - } - next = (next + 1) & mask; - } - used[idx] = false; - } - - private void rehash(int newCapacity) { - int[] oldKeys = keys; - Object[] oldValues = values; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new int[newCapacity]; - values = new Object[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - insertNew(oldKeys[i], (V) oldValues[i]); - } - } - } - - /** - * Fibonacci hashing for int keys. - */ - private static int hash(int key) { - int h = key * 0x9E3779B9; - return h ^ (h >>> 16); - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java deleted file mode 100644 index ee6540164c05d..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * An immutable pair of int values. - */ -public record IntIntPair(int leftInt, int rightInt) { - public static IntIntPair of(int left, int right) { - return new IntIntPair(left, right); - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java deleted file mode 100644 index be7ffd8076e44..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * Open-addressing hash set for primitive int values. - * Not thread-safe. - */ -public class IntOpenHashSet { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private int[] keys; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - public IntOpenHashSet() { - this(MIN_CAPACITY); - } - - public IntOpenHashSet(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new int[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - public boolean add(int key) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (true) { - if (!used[idx]) { - if (size >= threshold) { - rehash(capacity * 2); - return add(key); - } - keys[idx] = key; - used[idx] = true; - size++; - return true; - } - if (keys[idx] == key) { - return false; - } - idx = (idx + 1) & mask; - } - } - - public boolean contains(int key) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (true) { - if (!used[idx]) { - return false; - } - if (keys[idx] == key) { - return true; - } - idx = (idx + 1) & mask; - } - } - - public int size() { - return size; - } - - public boolean isEmpty() { - return size == 0; - } - - private void rehash(int newCapacity) { - int[] oldKeys = keys; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new int[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - add(oldKeys[i]); - } - } - } - - private static int hash(int key) { - int h = key * 0x9E3779B9; - return h ^ (h >>> 16); - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java deleted file mode 100644 index 10b2e30b9e478..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.function.LongToIntFunction; - -/** - * A map with primitive {@code long} keys and primitive {@code int} values. - * - *

The default return value for missing keys is {@code 0}. - */ -public interface Long2IntMap { - - /** - * Returns the value for the given key, or {@code 0} if not present. - * - * @param key the key - * @return the mapped value, or {@code 0} - */ - int get(long key); - - /** - * Associates the given value with the given key. - * - * @param key the key - * @param value the value - * @return the previous value, or {@code 0} if there was no mapping - */ - int put(long key, int value); - - /** - * Removes the mapping for the given key. - * - * @param key the key - * @return the previous value, or {@code 0} if there was no mapping - */ - int remove(long key); - - /** - * Returns the value for the given key, or the specified default if not present. - * - * @param key the key - * @param defaultValue the default value to return if the key is absent - * @return the mapped value, or {@code defaultValue} - */ - int getOrDefault(long key, int defaultValue); - - /** - * If the key is not already present, computes its value using the given function - * and inserts it. - * - * @param key the key - * @param mappingFunction the function to compute a value - * @return the current (existing or computed) value - */ - int computeIfAbsent(long key, LongToIntFunction mappingFunction); - - /** - * Returns {@code true} if this map contains no entries. - */ - boolean isEmpty(); - - /** - * Removes all entries from this map. - */ - void clear(); -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java deleted file mode 100644 index 22e876dc68373..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.function.LongToIntFunction; - -/** - * Open-addressing hash map with primitive long keys and primitive int values. - * Uses linear probing and fibonacci hashing. Returns 0 for missing keys. - * Not thread-safe. - */ -public class Long2IntOpenHashMap implements Long2IntMap { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private long[] keys; - private int[] values; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - public Long2IntOpenHashMap() { - this(MIN_CAPACITY); - } - - public Long2IntOpenHashMap(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new long[cap]; - values = new int[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - @Override - public int get(long key) { - int idx = indexOf(key); - return idx >= 0 ? values[idx] : 0; - } - - @Override - public int put(long key, int value) { - int idx = indexOf(key); - if (idx >= 0) { - int old = values[idx]; - values[idx] = value; - return old; - } - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - return 0; - } - - @Override - public int remove(long key) { - int idx = indexOf(key); - if (idx < 0) { - return 0; - } - int old = values[idx]; - removeAt(idx); - return old; - } - - @Override - public int getOrDefault(long key, int defaultValue) { - int idx = indexOf(key); - return idx >= 0 ? values[idx] : defaultValue; - } - - @Override - public int computeIfAbsent(long key, LongToIntFunction mappingFunction) { - int idx = indexOf(key); - if (idx >= 0) { - return values[idx]; - } - int value = mappingFunction.applyAsInt(key); - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - return value; - } - - @Override - public boolean isEmpty() { - return size == 0; - } - - @Override - public void clear() { - if (size > 0) { - java.util.Arrays.fill(used, false); - size = 0; - } - } - - private int indexOf(long key) { - int mask = capacity - 1; - int idx = Long2ObjectOpenHashMap.hash(key) & mask; - while (true) { - if (!used[idx]) { - return -1; - } - if (keys[idx] == key) { - return idx; - } - idx = (idx + 1) & mask; - } - } - - private void insertNew(long key, int value) { - int mask = capacity - 1; - int idx = Long2ObjectOpenHashMap.hash(key) & mask; - while (used[idx]) { - idx = (idx + 1) & mask; - } - keys[idx] = key; - values[idx] = value; - used[idx] = true; - size++; - } - - private void removeAt(int idx) { - int mask = capacity - 1; - size--; - int next = (idx + 1) & mask; - while (used[next]) { - int naturalSlot = Long2ObjectOpenHashMap.hash(keys[next]) & mask; - if ((next > idx && (naturalSlot <= idx || naturalSlot > next)) - || (next < idx && (naturalSlot <= idx && naturalSlot > next))) { - keys[idx] = keys[next]; - values[idx] = values[next]; - idx = next; - } - next = (next + 1) & mask; - } - used[idx] = false; - } - - private void rehash(int newCapacity) { - long[] oldKeys = keys; - int[] oldValues = values; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new long[newCapacity]; - values = new int[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - insertNew(oldKeys[i], oldValues[i]); - } - } - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java deleted file mode 100644 index 56891027d46d4..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.Collection; -import java.util.function.LongFunction; - -/** - * A map with primitive {@code long} keys and object values. - * - *

Using primitive keys avoids the overhead of boxing {@code long} values into - * {@link Long} objects, reducing both memory usage and GC pressure compared to - * {@code Map}. - * - * @param the type of mapped values - */ -public interface Long2ObjectMap { - - /** - * Returns the value associated with the given key, or {@code null} if not present. - * - * @param key the key - * @return the value, or {@code null} - */ - V get(long key); - - /** - * Associates the given value with the given key. - * - * @param key the key - * @param value the value - * @return the previous value, or {@code null} if there was no mapping - */ - V put(long key, V value); - - /** - * Removes the mapping for the given key. - * - * @param key the key - * @return the previous value, or {@code null} if there was no mapping - */ - V remove(long key); - - /** - * If the key is not already present, computes its value using the given function - * and inserts it. - * - * @param key the key - * @param mappingFunction the function to compute a value - * @return the current (existing or computed) value - */ - V computeIfAbsent(long key, LongFunction mappingFunction); - - /** - * Returns {@code true} if this map contains no entries. - */ - boolean isEmpty(); - - /** - * Returns the number of entries in this map. - */ - int size(); - - /** - * Removes all entries from this map. - */ - void clear(); - - /** - * Iterates over all entries, calling the consumer with primitive long keys - * (no boxing). - * - * @param consumer the consumer to call for each entry - */ - void forEach(LongObjConsumer consumer); - - /** - * Returns a {@link Collection} view of the values in this map. - * The collection supports iteration and {@code stream()} operations. - */ - Collection values(); -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java deleted file mode 100644 index 70bc888bc84b3..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.AbstractCollection; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.function.LongFunction; - -/** - * Open-addressing hash map with primitive long keys and object values. - * Uses linear probing and fibonacci hashing for good distribution. - * Not thread-safe. - */ -@SuppressWarnings("unchecked") -public class Long2ObjectOpenHashMap implements Long2ObjectMap { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private long[] keys; - private Object[] values; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - /** - * Creates a new map with default capacity. - */ - public Long2ObjectOpenHashMap() { - this(MIN_CAPACITY); - } - - /** - * Creates a new map sized to hold the expected number of items without rehashing. - * - * @param expectedItems the expected number of items - */ - public Long2ObjectOpenHashMap(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new long[cap]; - values = new Object[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - @Override - public V get(long key) { - int idx = indexOf(key); - if (idx >= 0) { - return (V) values[idx]; - } - return null; - } - - @Override - public V put(long key, V value) { - int idx = indexOf(key); - if (idx >= 0) { - V old = (V) values[idx]; - values[idx] = value; - return old; - } - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - return null; - } - - @Override - public V remove(long key) { - int idx = indexOf(key); - if (idx < 0) { - return null; - } - V old = (V) values[idx]; - removeAt(idx); - return old; - } - - @Override - public V computeIfAbsent(long key, LongFunction mappingFunction) { - int idx = indexOf(key); - if (idx >= 0) { - return (V) values[idx]; - } - V value = mappingFunction.apply(key); - if (value != null) { - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - } - return value; - } - - @Override - public boolean isEmpty() { - return size == 0; - } - - @Override - public int size() { - return size; - } - - @Override - public void clear() { - if (size > 0) { - java.util.Arrays.fill(used, false); - java.util.Arrays.fill(values, null); - size = 0; - } - } - - @Override - public void forEach(LongObjConsumer consumer) { - for (int i = 0; i < capacity; i++) { - if (used[i]) { - consumer.accept(keys[i], (V) values[i]); - } - } - } - - @Override - public Collection values() { - return new ValuesCollection(); - } - - /** - * Find the index of the given key, or -1 if not present. - */ - private int indexOf(long key) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (true) { - if (!used[idx]) { - return -1; - } - if (keys[idx] == key) { - return idx; - } - idx = (idx + 1) & mask; - } - } - - private void insertNew(long key, V value) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (used[idx]) { - idx = (idx + 1) & mask; - } - keys[idx] = key; - values[idx] = value; - used[idx] = true; - size++; - } - - /** - * Remove the entry at the given index using backward-shift deletion - * (no tombstones needed). - */ - private void removeAt(int idx) { - int mask = capacity - 1; - values[idx] = null; - size--; - // Shift back entries that may have been displaced by the removed entry - int next = (idx + 1) & mask; - while (used[next]) { - int naturalSlot = hash(keys[next]) & mask; - // Check if 'next' is displaced past 'idx' (wrapping around) - if ((next > idx && (naturalSlot <= idx || naturalSlot > next)) - || (next < idx && (naturalSlot <= idx && naturalSlot > next))) { - keys[idx] = keys[next]; - values[idx] = values[next]; - values[next] = null; - idx = next; - } - next = (next + 1) & mask; - } - used[idx] = false; - } - - private void rehash(int newCapacity) { - long[] oldKeys = keys; - Object[] oldValues = values; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new long[newCapacity]; - values = new Object[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - insertNew(oldKeys[i], (V) oldValues[i]); - } - } - } - - /** - * Fibonacci hashing for long keys. - */ - static int hash(long key) { - long h = key * 0x9E3779B97F4A7C15L; - return (int) (h ^ (h >>> 32)); - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } - - private class ValuesCollection extends AbstractCollection { - @Override - public Iterator iterator() { - return new Iterator<>() { - private int idx = findNext(0); - - @Override - public boolean hasNext() { - return idx < capacity; - } - - @Override - public V next() { - if (idx >= capacity) { - throw new NoSuchElementException(); - } - V val = (V) values[idx]; - idx = findNext(idx + 1); - return val; - } - - private int findNext(int from) { - while (from < capacity && !used[from]) { - from++; - } - return from; - } - }; - } - - @Override - public int size() { - return Long2ObjectOpenHashMap.this.size; - } - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java deleted file mode 100644 index a3dd6108e51e1..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * A consumer that accepts a primitive long key and an object value. - */ -@FunctionalInterface -public interface LongObjConsumer { - void accept(long key, V value); -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java deleted file mode 100644 index 48f3720dff7a7..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.function.LongConsumer; - -/** - * Open-addressing hash set for primitive long values. - * Not thread-safe. - */ -public class LongOpenHashSet implements Iterable { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private long[] keys; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - public LongOpenHashSet() { - this(MIN_CAPACITY); - } - - public LongOpenHashSet(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new long[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - public boolean add(long key) { - int mask = capacity - 1; - int idx = Long2ObjectOpenHashMap.hash(key) & mask; - while (true) { - if (!used[idx]) { - if (size >= threshold) { - rehash(capacity * 2); - return add(key); - } - keys[idx] = key; - used[idx] = true; - size++; - return true; - } - if (keys[idx] == key) { - return false; - } - idx = (idx + 1) & mask; - } - } - - public boolean contains(long key) { - int mask = capacity - 1; - int idx = Long2ObjectOpenHashMap.hash(key) & mask; - while (true) { - if (!used[idx]) { - return false; - } - if (keys[idx] == key) { - return true; - } - idx = (idx + 1) & mask; - } - } - - public int size() { - return size; - } - - public boolean isEmpty() { - return size == 0; - } - - public void forEach(LongConsumer consumer) { - for (int i = 0; i < capacity; i++) { - if (used[i]) { - consumer.accept(keys[i]); - } - } - } - - @Override - public Iterator iterator() { - return new Iterator<>() { - private int idx = findNext(0); - - @Override - public boolean hasNext() { - return idx < capacity; - } - - @Override - public Long next() { - if (idx >= capacity) { - throw new NoSuchElementException(); - } - long val = keys[idx]; - idx = findNext(idx + 1); - return val; - } - - private int findNext(int from) { - while (from < capacity && !used[from]) { - from++; - } - return from; - } - }; - } - - private void rehash(int newCapacity) { - long[] oldKeys = keys; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new long[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - add(oldKeys[i]); - } - } - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java deleted file mode 100644 index 8134afb091e25..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * An immutable pair of an object and an int value. - */ -public record ObjectIntPair(T left, int rightInt) { - public static ObjectIntPair of(T left, int right) { - return new ObjectIntPair<>(left, right); - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java deleted file mode 100644 index 2558e6af38d58..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.testng.annotations.Test; - -public class HashSetTest { - - @Test - public void testLongOpenHashSetBasic() { - LongOpenHashSet set = new LongOpenHashSet(); - assertTrue(set.isEmpty()); - assertTrue(set.add(1)); - assertTrue(set.add(2)); - assertFalse(set.add(1)); // duplicate - assertEquals(set.size(), 2); - assertTrue(set.contains(1)); - assertTrue(set.contains(2)); - assertFalse(set.contains(3)); - } - - @Test - public void testLongOpenHashSetIterable() { - LongOpenHashSet set = new LongOpenHashSet(); - set.add(3); - set.add(1); - set.add(2); - List values = new ArrayList<>(); - for (long v : set) { - values.add(v); - } - Collections.sort(values); - assertEquals(values, List.of(1L, 2L, 3L)); - } - - @Test - public void testLongOpenHashSetForEach() { - LongOpenHashSet set = new LongOpenHashSet(); - set.add(10); - set.add(20); - List values = new ArrayList<>(); - set.forEach((long v) -> values.add(v)); - Collections.sort(values); - assertEquals(values, List.of(10L, 20L)); - } - - @Test - public void testLongOpenHashSetRehash() { - LongOpenHashSet set = new LongOpenHashSet(4); - for (int i = 0; i < 100; i++) { - set.add(i); - } - assertEquals(set.size(), 100); - for (int i = 0; i < 100; i++) { - assertTrue(set.contains(i)); - } - } - - @Test - public void testIntOpenHashSetBasic() { - IntOpenHashSet set = new IntOpenHashSet(); - assertTrue(set.isEmpty()); - assertTrue(set.add(1)); - assertTrue(set.add(2)); - assertFalse(set.add(1)); // duplicate - assertEquals(set.size(), 2); - assertTrue(set.contains(1)); - assertTrue(set.contains(2)); - assertFalse(set.contains(3)); - } - - @Test - public void testIntOpenHashSetRehash() { - IntOpenHashSet set = new IntOpenHashSet(4); - for (int i = 0; i < 100; i++) { - set.add(i); - } - assertEquals(set.size(), 100); - for (int i = 0; i < 100; i++) { - assertTrue(set.contains(i)); - } - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java deleted file mode 100644 index 75c9e4d3d4b94..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import org.testng.annotations.Test; - -public class Int2ObjectOpenHashMapTest { - - @Test - public void testEmpty() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - assertTrue(map.isEmpty()); - assertNull(map.get(1)); - } - - @Test - public void testPutGet() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - assertNull(map.put(1, "one")); - assertNull(map.put(2, "two")); - assertEquals(map.get(1), "one"); - assertEquals(map.get(2), "two"); - assertNull(map.get(3)); - assertEquals(map.size(), 2); - } - - @Test - public void testRemove() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - assertEquals(map.remove(1), "one"); - assertNull(map.get(1)); - assertEquals(map.size(), 1); - } - - @Test - public void testRemoveConditional() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - String val = "one"; - map.put(1, val); - assertFalse(map.remove(1, "other")); // different ref - assertTrue(map.remove(1, val)); - assertTrue(map.isEmpty()); - } - - @Test - public void testClear() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.clear(); - assertTrue(map.isEmpty()); - assertNull(map.get(1)); - } - - @Test - public void testRehash() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(4); - for (int i = 0; i < 100; i++) { - map.put(i, i); - } - assertEquals(map.size(), 100); - for (int i = 0; i < 100; i++) { - assertEquals(map.get(i), Integer.valueOf(i)); - } - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java deleted file mode 100644 index 7ced026525274..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import org.testng.annotations.Test; - -public class Long2IntOpenHashMapTest { - - @Test - public void testEmpty() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - assertTrue(map.isEmpty()); - assertEquals(map.get(0), 0); - assertEquals(map.get(1), 0); - } - - @Test - public void testPutGet() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - assertEquals(map.put(1, 10), 0); - assertEquals(map.put(2, 20), 0); - assertFalse(map.isEmpty()); - assertEquals(map.get(1), 10); - assertEquals(map.get(2), 20); - assertEquals(map.get(3), 0); // default - } - - @Test - public void testPutReplace() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - map.put(1, 10); - assertEquals(map.put(1, 100), 10); - assertEquals(map.get(1), 100); - } - - @Test - public void testRemove() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - map.put(1, 10); - map.put(2, 20); - assertEquals(map.remove(1), 10); - assertEquals(map.get(1), 0); // default after removal - assertEquals(map.remove(99), 0); // not present - } - - @Test - public void testGetOrDefault() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - map.put(1, 10); - assertEquals(map.getOrDefault(1, -1), 10); - assertEquals(map.getOrDefault(2, -1), -1); - } - - @Test - public void testComputeIfAbsent() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - assertEquals(map.computeIfAbsent(1, k -> 10), 10); - assertEquals(map.computeIfAbsent(1, k -> 99), 10); - } - - @Test - public void testClear() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - map.put(1, 10); - map.put(2, 20); - map.clear(); - assertTrue(map.isEmpty()); - assertEquals(map.get(1), 0); - } - - @Test - public void testRehash() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(4); - for (int i = 0; i < 100; i++) { - map.put(i, i * 10); - } - for (int i = 0; i < 100; i++) { - assertEquals(map.get(i), i * 10); - } - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java deleted file mode 100644 index 6e33a5b531505..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.testng.annotations.Test; - -public class Long2ObjectOpenHashMapTest { - - @Test - public void testEmpty() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - assertTrue(map.isEmpty()); - assertEquals(map.size(), 0); - assertNull(map.get(0)); - assertNull(map.get(1)); - assertNull(map.remove(1)); - } - - @Test - public void testPutGet() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - assertNull(map.put(1, "one")); - assertNull(map.put(2, "two")); - assertNull(map.put(3, "three")); - assertEquals(map.size(), 3); - assertFalse(map.isEmpty()); - assertEquals(map.get(1), "one"); - assertEquals(map.get(2), "two"); - assertEquals(map.get(3), "three"); - assertNull(map.get(4)); - } - - @Test - public void testPutReplace() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - assertNull(map.put(1, "one")); - assertEquals(map.put(1, "ONE"), "one"); - assertEquals(map.size(), 1); - assertEquals(map.get(1), "ONE"); - } - - @Test - public void testRemove() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.put(3, "three"); - assertEquals(map.remove(2), "two"); - assertEquals(map.size(), 2); - assertNull(map.get(2)); - assertEquals(map.get(1), "one"); - assertEquals(map.get(3), "three"); - } - - @Test - public void testComputeIfAbsent() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - assertEquals(map.computeIfAbsent(1, k -> "one"), "one"); - assertEquals(map.size(), 1); - assertEquals(map.computeIfAbsent(1, k -> "other"), "one"); - assertEquals(map.size(), 1); - } - - @Test - public void testClear() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.clear(); - assertTrue(map.isEmpty()); - assertEquals(map.size(), 0); - assertNull(map.get(1)); - } - - @Test - public void testForEach() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.put(3, "three"); - Map collected = new HashMap<>(); - map.forEach(collected::put); - assertEquals(collected.size(), 3); - assertEquals(collected.get(1L), "one"); - assertEquals(collected.get(2L), "two"); - assertEquals(collected.get(3L), "three"); - } - - @Test - public void testValues() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.put(3, "three"); - List values = new ArrayList<>(map.values()); - assertEquals(values.size(), 3); - assertTrue(values.contains("one")); - assertTrue(values.contains("two")); - assertTrue(values.contains("three")); - } - - @Test - public void testRehash() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(4); - for (int i = 0; i < 100; i++) { - map.put(i, i); - } - assertEquals(map.size(), 100); - for (int i = 0; i < 100; i++) { - assertEquals(map.get(i), Integer.valueOf(i)); - } - } - - @Test - public void testZeroKey() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(0, "zero"); - assertEquals(map.get(0), "zero"); - assertEquals(map.size(), 1); - assertEquals(map.remove(0), "zero"); - assertTrue(map.isEmpty()); - } - - @Test - public void testNegativeKeys() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(-1, "neg1"); - map.put(Long.MIN_VALUE, "min"); - assertEquals(map.get(-1), "neg1"); - assertEquals(map.get(Long.MIN_VALUE), "min"); - } - - @Test - public void testRemoveAndReinsert() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - for (int i = 0; i < 50; i++) { - map.put(i, "v" + i); - } - for (int i = 0; i < 25; i++) { - map.remove(i); - } - assertEquals(map.size(), 25); - for (int i = 25; i < 50; i++) { - assertEquals(map.get(i), "v" + i); - } - // Reinsert - for (int i = 0; i < 25; i++) { - map.put(i, "new" + i); - } - assertEquals(map.size(), 50); - for (int i = 0; i < 25; i++) { - assertEquals(map.get(i), "new" + i); - } - } -} From 57d81f0cf04d2f6de7628de4d9c2fd2bdbd8af7d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 15:57:33 +0300 Subject: [PATCH 02/10] [improve][build] Minimize bundled fastutil classes in the shaded client jars ### Motivation The full fastutil jar is ~25MB / ~12,965 classes, of which the Pulsar client only uses a handful (via NegativeAcksTracker). Bundling all of it into the shaded client jars is wasteful. The Maven build (branch-4.2) avoids this with a `pulsar-client-dependencies-minimized` module driven by maven-shade-plugin's minimizeJar; this is the Gradle equivalent. ### Modifications - Reimplement pulsar-client-dependencies-minimized/build.gradle.kts on the GradleUp Shadow plugin. It declares pulsar-client-original with the `api` scope so Shadow's minimize() seeds reachability from the whole client closure, bundles only the libraries listed in `minimizedDependencies` ("group:name" entries, currently just fastutil), and prunes everything unreachable -> 591 fastutil classes (matching the Maven minimizeJar output, down from ~12,965). - Add a verifyMinimizedJar check that fails the build if a required class is pruned or if minimize() silently becomes a no-op. - Strip the build-only `api` seed from the module's outgoing variants so it exposes no transitive dependencies to consumers (self-contained fastutil-only jar). - Wire it in: settings include, relocate it.unimi.dsi.fastutil in the client shade conventions, and have pulsar-client-shaded / -all / -admin-shaded exclude fastutil from pulsar-client(-admin)-original and bundle the minimized module instead. Assisted-by: Claude Code --- ...pulsar.client-shade-conventions.gradle.kts | 2 + pulsar-client-admin-shaded/build.gradle.kts | 7 +- pulsar-client-all/build.gradle.kts | 7 +- .../build.gradle.kts | 129 ++++++++++++++++++ pulsar-client-shaded/build.gradle.kts | 7 +- settings.gradle.kts | 3 + 6 files changed, 152 insertions(+), 3 deletions(-) create mode 100644 pulsar-client-dependencies-minimized/build.gradle.kts diff --git a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts index 60ed0cfa67cf3..a3ef4dfe83c85 100644 --- a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts +++ b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts @@ -37,6 +37,7 @@ tasks.named("shadowJ include(project(":pulsar-client-admin-original")) include(project(":pulsar-common")) include(project(":pulsar-client-messagecrypto-bc")) + include(project(":pulsar-client-dependencies-minimized")) include(dependency("com.fasterxml.jackson.*:.*")) include(dependency("com.google.*:.*")) include(dependency("com.google.auth:.*")) @@ -143,6 +144,7 @@ tasks.named("shadowJ relocateWithPrefix(shadePrefix, "io.opencensus") relocateWithPrefix(shadePrefix, "io.prometheus.client") relocateWithPrefix(shadePrefix, "io.swagger") + relocateWithPrefix(shadePrefix, "it.unimi.dsi.fastutil") relocateWithPrefix(shadePrefix, "javassist") relocateWithPrefix(shadePrefix, "jakarta.activation") relocateWithPrefix(shadePrefix, "jakarta.annotation") diff --git a/pulsar-client-admin-shaded/build.gradle.kts b/pulsar-client-admin-shaded/build.gradle.kts index 8fe0120d3a264..659f60244ee02 100644 --- a/pulsar-client-admin-shaded/build.gradle.kts +++ b/pulsar-client-admin-shaded/build.gradle.kts @@ -24,7 +24,12 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): - implementation(project(":pulsar-client-admin-original")) + implementation(project(":pulsar-client-admin-original")) { + // fastutil is bundled via :pulsar-client-dependencies-minimized, which packs only the + // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. + exclude(group = "it.unimi.dsi", module = "fastutil") + } + implementation(project(":pulsar-client-dependencies-minimized")) implementation(project(":pulsar-client-messagecrypto-bc")) // Non-bundled runtime dependencies for the dependency-reduced published POM/GMM (the `shadow` diff --git a/pulsar-client-all/build.gradle.kts b/pulsar-client-all/build.gradle.kts index 7c49a7d5465ae..eedbb2b667c64 100644 --- a/pulsar-client-all/build.gradle.kts +++ b/pulsar-client-all/build.gradle.kts @@ -24,7 +24,12 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): - implementation(project(":pulsar-client-original")) + implementation(project(":pulsar-client-original")) { + // fastutil is bundled via :pulsar-client-dependencies-minimized, which packs only the + // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. + exclude(group = "it.unimi.dsi", module = "fastutil") + } + implementation(project(":pulsar-client-dependencies-minimized")) implementation(project(":pulsar-client-admin-original")) implementation(project(":pulsar-client-messagecrypto-bc")) diff --git a/pulsar-client-dependencies-minimized/build.gradle.kts b/pulsar-client-dependencies-minimized/build.gradle.kts new file mode 100644 index 0000000000000..de495a0677ac0 --- /dev/null +++ b/pulsar-client-dependencies-minimized/build.gradle.kts @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Imported explicitly: the `java` plugin contributes a `java { }` extension accessor, +// so an unqualified `java.util.zip.ZipFile` would resolve `java` to that extension. +import java.util.zip.ZipFile + +// Produces a jar containing ONLY the classes of the "minimized" dependencies that +// are reachable from pulsar-client-original (and its full transitive closure) — +// exactly the classes that end up shaded into the final pulsar-client jars. It is +// bundled (and relocated) by :pulsar-client-shaded, :pulsar-client-all and +// :pulsar-client-admin-shaded so the full dependency jars are not shipped. +// +// This is the Gradle equivalent of the branch-4.2 Maven module, which uses +// maven-shade-plugin's over {pulsar-client-original, fastutil}. +// +// How it works: +// * pulsar-client-original is declared with the `api` scope. The Shadow plugin's +// minimize() seeds its reachability analysis (UnusedTracker) from the project's +// own source classes plus its `api`-scoped jars — so the entire pulsar-client +// closure becomes the set of reachability roots. (With `implementation`, or no +// source, minimize() has no roots and keeps the full ~12,900-class fastutil jar.) +// * Only the minimized libraries are bundled into the jar (the `include` filter +// below); pulsar-client-original itself is read purely as a reachability root. +// * minimize() then drops every bundled class not reachable from those roots. + +plugins { + id("pulsar.java-conventions") + id("pulsar.shadow-conventions") +} + +// Dependencies to minimize, as "group:name" entries. Only the classes from these +// artifacts that are actually reachable from the pulsar-client closure are kept. Add +// more entries here when another heavy dependency needs to be minimized the same way. +val minimizedDependencies: List = listOf( + "it.unimi.dsi:fastutil", +) + +dependencies { + // `api` (not `implementation`) so minimize() uses pulsar-client-original and its + // transitive closure as the reachability roots. Its own classes are not bundled. + api(project(":pulsar-client-original")) +} + +// The api dependency above is a BUILD-ONLY reachability seed for minimize(). This module +// ships a self-contained jar of minimized fastutil classes and must not drag +// pulsar-client-original (or anything else) onto consumers' classpaths, so strip all +// inherited dependencies from the consumable (outgoing) variants. +listOf("apiElements", "runtimeElements").forEach { variant -> + configurations.named(variant) { + setExtendsFrom(emptySet()) + } +} + +tasks.shadowJar { + // Bundle ONLY the minimized libraries; pulsar-client-original is read by minimize() + // as a reachability root but is excluded from the output jar. + dependencies { + minimizedDependencies.forEach { coords -> include(dependency("$coords:.*")) } + } + // Drop every bundled class not reachable from the api reachability roots above. + minimize() +} + +// --------------------------------------------------------------------------- +// Verification: fail the build if the entry points statically referenced by +// pulsar-client-original (NegativeAcksTracker, the only direct fastutil consumer) +// are ever dropped by over-pruning, or if minimize() silently becomes a no-op (which +// would ship the full fastutil jar). Add to this list when new usage is introduced. +// --------------------------------------------------------------------------- +val requiredMinimizedClasses = listOf( + "it/unimi/dsi/fastutil/longs/Long2ObjectAVLTreeMap.class", + "it/unimi/dsi/fastutil/longs/Long2ObjectMap.class", + "it/unimi/dsi/fastutil/longs/Long2ObjectOpenHashMap.class", + "it/unimi/dsi/fastutil/longs/Long2ObjectSortedMap.class", + "it/unimi/dsi/fastutil/longs/LongBidirectionalIterator.class", +) + +// Upper bound that comfortably exceeds the reachable set (~1k classes) but is well +// below the full fastutil jar (~12,965 classes), so a minimize() regression fails loudly. +val maxRetainedClasses = 5000 + +val verifyMinimizedJar by tasks.registering { + val jarFile = tasks.shadowJar.flatMap { it.archiveFile } + val required = requiredMinimizedClasses + val maxClasses = maxRetainedClasses + inputs.file(jarFile) + doLast { + val jar = jarFile.get().asFile + val entries = mutableSetOf() + ZipFile(jar).use { zf -> + val e = zf.entries() + while (e.hasMoreElements()) { + entries.add(e.nextElement().name) + } + } + val classCount = entries.count { it.endsWith(".class") } + val missing = required.filterNot { it in entries } + if (missing.isNotEmpty()) { + throw GradleException("Minimized jar is missing required classes (over-pruned): $missing") + } + if (classCount > maxClasses) { + throw GradleException( + "Minimized jar retained $classCount classes (> $maxClasses) — minimize() is not pruning." + ) + } + logger.lifecycle("Minimized jar OK: $classCount classes retained.") + } +} + +tasks.named("check") { + dependsOn(verifyMinimizedJar) +} diff --git a/pulsar-client-shaded/build.gradle.kts b/pulsar-client-shaded/build.gradle.kts index 654e50f7273c5..dfd0f0597fcd7 100644 --- a/pulsar-client-shaded/build.gradle.kts +++ b/pulsar-client-shaded/build.gradle.kts @@ -24,7 +24,12 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): - implementation(project(":pulsar-client-original")) + implementation(project(":pulsar-client-original")) { + // fastutil is bundled via :pulsar-client-dependencies-minimized, which packs only the + // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. + exclude(group = "it.unimi.dsi", module = "fastutil") + } + implementation(project(":pulsar-client-dependencies-minimized")) implementation(project(":pulsar-client-messagecrypto-bc")) // Non-bundled runtime dependencies. These are the ONLY entries in the dependency-reduced diff --git a/settings.gradle.kts b/settings.gradle.kts index c847d5be210cb..d751adcc33678 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -191,6 +191,9 @@ include("pulsar-broker-auth-oidc") include("pulsar-broker-auth-sasl") include("pulsar-client-auth-sasl") +// Tier 9 — shaded utility modules (in core-modules) +include("pulsar-client-dependencies-minimized") + // Tier 10 — shaded client modules (in core-modules) include("pulsar-client-shaded") include("pulsar-client-all") From ce67fb44d3aa8f62a277e7163aa635ce84f0921d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 15:58:06 +0300 Subject: [PATCH 03/10] [improve][build] Upgrade gradleup shadow plugin to 9.4.2 Routine upgrade of the com.gradleup.shadow plugin used to build the shaded jars (including the new pulsar-client-dependencies-minimized minimization). Assisted-by: Claude Code --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 544289017647a..ae846de9b6bf9 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -176,7 +176,7 @@ thrift = "0.23.0" datasketches-memory = "4.1.0" datasketches-java = "7.0.1" # Shading -shadow = "9.4.1" +shadow = "9.4.2" [libraries] # SLF4J From ad052be02b82d000dca51b626a519b44f61a5c37 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 16:12:58 +0300 Subject: [PATCH 04/10] [improve][build] Rename pulsar-client-dependencies-minimized to pulsar-client-fastutil-minimized MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation The module exists solely to minimize the bundled fastutil classes, so the more specific name makes its purpose clear (and matches the "minimize this specific library" design driven by the `minimizedDependencies` list). ### Modifications - Rename the module directory and Gradle project to pulsar-client-fastutil-minimized (directory and project path match, so no module-name-vs-directory gotcha), and update the references in settings.gradle.kts, the client shade conventions and the three shaded client build files. - Simplify verifyMinimizedJar to a single class-count guard (fail if the jar retains more than 600 classes — actual is 591), dropping the per-class required-list check. The task stays configuration-cache compatible (captures only Providers/values, no Project access in the action). Assisted-by: Claude Code --- ...pulsar.client-shade-conventions.gradle.kts | 2 +- pulsar-client-admin-shaded/build.gradle.kts | 4 +- pulsar-client-all/build.gradle.kts | 4 +- .../build.gradle.kts | 39 ++++--------------- pulsar-client-shaded/build.gradle.kts | 4 +- settings.gradle.kts | 2 +- 6 files changed, 16 insertions(+), 39 deletions(-) rename {pulsar-client-dependencies-minimized => pulsar-client-fastutil-minimized}/build.gradle.kts (71%) diff --git a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts index a3ef4dfe83c85..d5e06b0913ce4 100644 --- a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts +++ b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts @@ -37,7 +37,7 @@ tasks.named("shadowJ include(project(":pulsar-client-admin-original")) include(project(":pulsar-common")) include(project(":pulsar-client-messagecrypto-bc")) - include(project(":pulsar-client-dependencies-minimized")) + include(project(":pulsar-client-fastutil-minimized")) include(dependency("com.fasterxml.jackson.*:.*")) include(dependency("com.google.*:.*")) include(dependency("com.google.auth:.*")) diff --git a/pulsar-client-admin-shaded/build.gradle.kts b/pulsar-client-admin-shaded/build.gradle.kts index 659f60244ee02..fbe36d769c2b2 100644 --- a/pulsar-client-admin-shaded/build.gradle.kts +++ b/pulsar-client-admin-shaded/build.gradle.kts @@ -25,11 +25,11 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): implementation(project(":pulsar-client-admin-original")) { - // fastutil is bundled via :pulsar-client-dependencies-minimized, which packs only the + // fastutil is bundled via :pulsar-client-fastutil-minimized, which packs only the // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. exclude(group = "it.unimi.dsi", module = "fastutil") } - implementation(project(":pulsar-client-dependencies-minimized")) + implementation(project(":pulsar-client-fastutil-minimized")) implementation(project(":pulsar-client-messagecrypto-bc")) // Non-bundled runtime dependencies for the dependency-reduced published POM/GMM (the `shadow` diff --git a/pulsar-client-all/build.gradle.kts b/pulsar-client-all/build.gradle.kts index eedbb2b667c64..6f7406d266ad4 100644 --- a/pulsar-client-all/build.gradle.kts +++ b/pulsar-client-all/build.gradle.kts @@ -25,11 +25,11 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): implementation(project(":pulsar-client-original")) { - // fastutil is bundled via :pulsar-client-dependencies-minimized, which packs only the + // fastutil is bundled via :pulsar-client-fastutil-minimized, which packs only the // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. exclude(group = "it.unimi.dsi", module = "fastutil") } - implementation(project(":pulsar-client-dependencies-minimized")) + implementation(project(":pulsar-client-fastutil-minimized")) implementation(project(":pulsar-client-admin-original")) implementation(project(":pulsar-client-messagecrypto-bc")) diff --git a/pulsar-client-dependencies-minimized/build.gradle.kts b/pulsar-client-fastutil-minimized/build.gradle.kts similarity index 71% rename from pulsar-client-dependencies-minimized/build.gradle.kts rename to pulsar-client-fastutil-minimized/build.gradle.kts index de495a0677ac0..6210f02f995a9 100644 --- a/pulsar-client-dependencies-minimized/build.gradle.kts +++ b/pulsar-client-fastutil-minimized/build.gradle.kts @@ -78,49 +78,26 @@ tasks.shadowJar { minimize() } -// --------------------------------------------------------------------------- -// Verification: fail the build if the entry points statically referenced by -// pulsar-client-original (NegativeAcksTracker, the only direct fastutil consumer) -// are ever dropped by over-pruning, or if minimize() silently becomes a no-op (which -// would ship the full fastutil jar). Add to this list when new usage is introduced. -// --------------------------------------------------------------------------- -val requiredMinimizedClasses = listOf( - "it/unimi/dsi/fastutil/longs/Long2ObjectAVLTreeMap.class", - "it/unimi/dsi/fastutil/longs/Long2ObjectMap.class", - "it/unimi/dsi/fastutil/longs/Long2ObjectOpenHashMap.class", - "it/unimi/dsi/fastutil/longs/Long2ObjectSortedMap.class", - "it/unimi/dsi/fastutil/longs/LongBidirectionalIterator.class", -) - -// Upper bound that comfortably exceeds the reachable set (~1k classes) but is well -// below the full fastutil jar (~12,965 classes), so a minimize() regression fails loudly. -val maxRetainedClasses = 5000 +// Verification: the reachable set is ~591 classes; this upper bound is comfortably +// above it but far below the full fastutil jar (~12,965 classes), so a minimize() +// regression (e.g. the no-op that ships the whole jar) fails the build. Bump it if +// new fastutil usage legitimately grows the minimized set past the limit. +val maxRetainedClasses = 600 val verifyMinimizedJar by tasks.registering { val jarFile = tasks.shadowJar.flatMap { it.archiveFile } - val required = requiredMinimizedClasses val maxClasses = maxRetainedClasses inputs.file(jarFile) doLast { - val jar = jarFile.get().asFile - val entries = mutableSetOf() - ZipFile(jar).use { zf -> - val e = zf.entries() - while (e.hasMoreElements()) { - entries.add(e.nextElement().name) - } - } - val classCount = entries.count { it.endsWith(".class") } - val missing = required.filterNot { it in entries } - if (missing.isNotEmpty()) { - throw GradleException("Minimized jar is missing required classes (over-pruned): $missing") + val classCount = ZipFile(jarFile.get().asFile).use { zf -> + zf.entries().asSequence().count { it.name.endsWith(".class") } } if (classCount > maxClasses) { throw GradleException( "Minimized jar retained $classCount classes (> $maxClasses) — minimize() is not pruning." ) } - logger.lifecycle("Minimized jar OK: $classCount classes retained.") + logger.lifecycle("Minimized fastutil jar OK: $classCount classes retained (limit $maxClasses).") } } diff --git a/pulsar-client-shaded/build.gradle.kts b/pulsar-client-shaded/build.gradle.kts index dfd0f0597fcd7..74399ac50d41f 100644 --- a/pulsar-client-shaded/build.gradle.kts +++ b/pulsar-client-shaded/build.gradle.kts @@ -25,11 +25,11 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): implementation(project(":pulsar-client-original")) { - // fastutil is bundled via :pulsar-client-dependencies-minimized, which packs only the + // fastutil is bundled via :pulsar-client-fastutil-minimized, which packs only the // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. exclude(group = "it.unimi.dsi", module = "fastutil") } - implementation(project(":pulsar-client-dependencies-minimized")) + implementation(project(":pulsar-client-fastutil-minimized")) implementation(project(":pulsar-client-messagecrypto-bc")) // Non-bundled runtime dependencies. These are the ONLY entries in the dependency-reduced diff --git a/settings.gradle.kts b/settings.gradle.kts index d751adcc33678..243aae2be9f52 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -192,7 +192,7 @@ include("pulsar-broker-auth-sasl") include("pulsar-client-auth-sasl") // Tier 9 — shaded utility modules (in core-modules) -include("pulsar-client-dependencies-minimized") +include("pulsar-client-fastutil-minimized") // Tier 10 — shaded client modules (in core-modules) include("pulsar-client-shaded") From 1921723eaf5821894305abb55210ba834720c3a5 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 16:30:24 +0300 Subject: [PATCH 05/10] [improve][build] Extract a reusable pulsar.fastutil-minimized-conventions plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation The fastutil-minimization setup (shadow minimize() seeded from `api` roots, bundle-only filter, stripped outgoing variants, class-count verification) was inline in pulsar-client-fastutil-minimized. A second minimization module is coming (pulsar-broker-fastutil-minimized), so the shared machinery moves into a convention plugin. ### Modifications - Add build-logic `pulsar.fastutil-minimized-conventions` precompiled script plugin and a `FastutilMinimizedExtension`. A consuming module declares its reachability roots as `api(project(...))` dependencies and sets `fastutilMinimized { maxRetainedClasses.set(N) }`; the plugin handles minimize()/include/extendsFrom-stripping and the verifyMinimizedJar check (configuration-cache compatible — the task action captures only Providers). - Reduce pulsar-client-fastutil-minimized/build.gradle.kts to apply the plugin, declare the pulsar-client-original root, and set the 600-class limit. Assisted-by: Claude Code --- .../main/kotlin/FastutilMinimizedExtension.kt | 39 ++++++++ ....fastutil-minimized-conventions.gradle.kts | 94 +++++++++++++++++++ .../build.gradle.kts | 88 ++--------------- 3 files changed, 143 insertions(+), 78 deletions(-) create mode 100644 build-logic/conventions/src/main/kotlin/FastutilMinimizedExtension.kt create mode 100644 build-logic/conventions/src/main/kotlin/pulsar.fastutil-minimized-conventions.gradle.kts diff --git a/build-logic/conventions/src/main/kotlin/FastutilMinimizedExtension.kt b/build-logic/conventions/src/main/kotlin/FastutilMinimizedExtension.kt new file mode 100644 index 0000000000000..a3d59077c083e --- /dev/null +++ b/build-logic/conventions/src/main/kotlin/FastutilMinimizedExtension.kt @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.gradle.api.provider.ListProperty +import org.gradle.api.provider.Property + +/** + * Configuration for the `pulsar.dependency-minimized-conventions` plugin. + * + * A "minimized" packaging module declares its reachability roots as `api(project(...))` + * dependencies and then only needs to express which libraries to minimize and the + * expected upper bound on the retained class count. + */ +interface FastutilMinimizedExtension { + /** Libraries to minimize, as `"group:name"` entries. Defaults to `it.unimi.dsi:fastutil`. */ + val minimizedDependencies: ListProperty + + /** + * Upper bound on the number of `.class` entries retained in the shaded jar. The build fails + * if the jar exceeds it — this catches a minimize() regression that would ship the full jar. + */ + val maxRetainedClasses: Property +} diff --git a/build-logic/conventions/src/main/kotlin/pulsar.fastutil-minimized-conventions.gradle.kts b/build-logic/conventions/src/main/kotlin/pulsar.fastutil-minimized-conventions.gradle.kts new file mode 100644 index 0000000000000..2f863972b3db1 --- /dev/null +++ b/build-logic/conventions/src/main/kotlin/pulsar.fastutil-minimized-conventions.gradle.kts @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Imported explicitly: the `java` plugin contributes a `java { }` extension accessor, +// so an unqualified `java.util.zip.ZipFile` would resolve `java` to that extension. +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar +import java.util.zip.ZipFile + +// Convention for " minimized" packaging modules. Produces a shadow jar that +// contains only the classes of the minimized libraries that are actually reachable from +// the module's reachability roots and their transitive closure. +// +// A consuming module: +// * applies this plugin, +// * declares its reachability roots as `api(project(...))` dependencies (e.g. the +// pulsar projects that use the library being minimized), and +// * sets `fastutilMinimized { maxRetainedClasses.set(N) }`. +// +// Why `api`: Shadow's minimize() seeds its reachability analysis (UnusedTracker) from +// the project's own source classes plus its `api`-scoped jars. A packaging module has no +// source, so the `api` roots are what drive the analysis; with `implementation` (or no +// roots) minimize() has nothing to start from and keeps the whole jar. + +plugins { + id("pulsar.java-conventions") + id("pulsar.shadow-conventions") +} + +val minimized = extensions.create("fastutilMinimized") +minimized.minimizedDependencies.convention(listOf("it.unimi.dsi:fastutil")) + +// The `api` roots are a build-only reachability seed; strip them (and everything else) +// from the consumable variants so this module ships a self-contained jar with no +// transitive dependencies on consumers' classpaths. +listOf("apiElements", "runtimeElements").forEach { variant -> + configurations.named(variant) { + setExtendsFrom(emptySet()) + } +} + +tasks.named("shadowJar") { + val minimizedDeps = minimized.minimizedDependencies + inputs.property("minimizedDependencies", minimizedDeps) + // Bundle ONLY the minimized libraries; the `api` roots are read by minimize() as + // reachability roots but are not part of the output jar. + dependencies { + minimizedDeps.get().forEach { coords -> include(dependency("$coords:.*")) } + } + // Drop every bundled class not reachable from the reachability roots. + minimize() +} + +// Verify the jar was actually pruned: the reachable set is small, so a count well above +// it but far below the full library jar catches a minimize() regression (e.g. the no-op +// that ships the whole jar). Stays configuration-cache compatible — the action captures +// only Providers, never the Project. +val verifyMinimizedJar = tasks.register("verifyMinimizedJar") { + val jarFile = tasks.named("shadowJar").flatMap { it.archiveFile } + val maxClasses = minimized.maxRetainedClasses + inputs.file(jarFile) + inputs.property("maxRetainedClasses", maxClasses) + doLast { + val limit = maxClasses.get() + val classCount = ZipFile(jarFile.get().asFile).use { zf -> + zf.entries().asSequence().count { it.name.endsWith(".class") } + } + if (classCount > limit) { + throw GradleException( + "Minimized jar retained $classCount classes (> $limit) — minimize() is not pruning." + ) + } + logger.lifecycle("Minimized jar OK: $classCount classes retained (limit $limit).") + } +} + +tasks.named("check") { + dependsOn(verifyMinimizedJar) +} diff --git a/pulsar-client-fastutil-minimized/build.gradle.kts b/pulsar-client-fastutil-minimized/build.gradle.kts index 6210f02f995a9..bee29c1b83110 100644 --- a/pulsar-client-fastutil-minimized/build.gradle.kts +++ b/pulsar-client-fastutil-minimized/build.gradle.kts @@ -17,90 +17,22 @@ * under the License. */ -// Imported explicitly: the `java` plugin contributes a `java { }` extension accessor, -// so an unqualified `java.util.zip.ZipFile` would resolve `java` to that extension. -import java.util.zip.ZipFile - -// Produces a jar containing ONLY the classes of the "minimized" dependencies that -// are reachable from pulsar-client-original (and its full transitive closure) — -// exactly the classes that end up shaded into the final pulsar-client jars. It is -// bundled (and relocated) by :pulsar-client-shaded, :pulsar-client-all and -// :pulsar-client-admin-shaded so the full dependency jars are not shipped. -// -// This is the Gradle equivalent of the branch-4.2 Maven module, which uses -// maven-shade-plugin's over {pulsar-client-original, fastutil}. -// -// How it works: -// * pulsar-client-original is declared with the `api` scope. The Shadow plugin's -// minimize() seeds its reachability analysis (UnusedTracker) from the project's -// own source classes plus its `api`-scoped jars — so the entire pulsar-client -// closure becomes the set of reachability roots. (With `implementation`, or no -// source, minimize() has no roots and keeps the full ~12,900-class fastutil jar.) -// * Only the minimized libraries are bundled into the jar (the `include` filter -// below); pulsar-client-original itself is read purely as a reachability root. -// * minimize() then drops every bundled class not reachable from those roots. +// Jar of just the fastutil classes reachable from the Pulsar client. It is bundled (and +// relocated) by :pulsar-client-shaded, :pulsar-client-all and :pulsar-client-admin-shaded +// so the full ~25MB fastutil jar is not shipped. See pulsar.fastutil-minimized-conventions. plugins { - id("pulsar.java-conventions") - id("pulsar.shadow-conventions") + id("pulsar.fastutil-minimized-conventions") } -// Dependencies to minimize, as "group:name" entries. Only the classes from these -// artifacts that are actually reachable from the pulsar-client closure are kept. Add -// more entries here when another heavy dependency needs to be minimized the same way. -val minimizedDependencies: List = listOf( - "it.unimi.dsi:fastutil", -) - dependencies { - // `api` (not `implementation`) so minimize() uses pulsar-client-original and its - // transitive closure as the reachability roots. Its own classes are not bundled. + // Reachability root: pulsar-client-original (its NegativeAcksTracker is the only direct + // fastutil consumer on the client side). minimize() seeds from its transitive closure. api(project(":pulsar-client-original")) } -// The api dependency above is a BUILD-ONLY reachability seed for minimize(). This module -// ships a self-contained jar of minimized fastutil classes and must not drag -// pulsar-client-original (or anything else) onto consumers' classpaths, so strip all -// inherited dependencies from the consumable (outgoing) variants. -listOf("apiElements", "runtimeElements").forEach { variant -> - configurations.named(variant) { - setExtendsFrom(emptySet()) - } -} - -tasks.shadowJar { - // Bundle ONLY the minimized libraries; pulsar-client-original is read by minimize() - // as a reachability root but is excluded from the output jar. - dependencies { - minimizedDependencies.forEach { coords -> include(dependency("$coords:.*")) } - } - // Drop every bundled class not reachable from the api reachability roots above. - minimize() -} - -// Verification: the reachable set is ~591 classes; this upper bound is comfortably -// above it but far below the full fastutil jar (~12,965 classes), so a minimize() -// regression (e.g. the no-op that ships the whole jar) fails the build. Bump it if -// new fastutil usage legitimately grows the minimized set past the limit. -val maxRetainedClasses = 600 - -val verifyMinimizedJar by tasks.registering { - val jarFile = tasks.shadowJar.flatMap { it.archiveFile } - val maxClasses = maxRetainedClasses - inputs.file(jarFile) - doLast { - val classCount = ZipFile(jarFile.get().asFile).use { zf -> - zf.entries().asSequence().count { it.name.endsWith(".class") } - } - if (classCount > maxClasses) { - throw GradleException( - "Minimized jar retained $classCount classes (> $maxClasses) — minimize() is not pruning." - ) - } - logger.lifecycle("Minimized fastutil jar OK: $classCount classes retained (limit $maxClasses).") - } -} - -tasks.named("check") { - dependsOn(verifyMinimizedJar) +fastutilMinimized { + // The reachable set is ~591 classes; fail the build if it grows past this (e.g. if + // minimize() regresses and ships the full ~12,965-class jar). + maxRetainedClasses.set(600) } From b22b6288144b61edb75ca85ffc9d4f9b1f6cb501 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 16:30:46 +0300 Subject: [PATCH 06/10] [improve][dist] Ship minimized fastutil in the server distribution instead of the full jar MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation One argument for dropping fastutil (#25413) was that the full ~25MB jar enlarges the server distribution / docker image. Shipping a minimized fastutil that contains only the classes actually used on the server (and the bundled, unrelocated client) side resolves that, while keeping the convenience that any Pulsar code can pick a new fastutil collection and the build automatically pulls in the classes it needs. ### Modifications - Add pulsar-broker-fastutil-minimized (uses pulsar.fastutil-minimized-conventions) with pulsar-broker and pulsar-client-original as reachability roots — a superset of the client minimized set (~818 classes vs ~591). - In the server distribution, exclude the full it.unimi.dsi:fastutil jar from distLib and bundle pulsar-broker-fastutil-minimized instead. The client-only pulsar-client-fastutil-minimized is not pulled into the server distribution. - Drop the now-stale fastutil entry from the server binary LICENSE (checkBinaryLicense passes; the minimized classes ship inside a Pulsar-owned jar). Assisted-by: Claude Code --- distribution/server/build.gradle.kts | 6 +++ .../server/src/assemble/LICENSE.bin.txt | 1 - .../build.gradle.kts | 41 +++++++++++++++++++ settings.gradle.kts | 1 + 4 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker-fastutil-minimized/build.gradle.kts diff --git a/distribution/server/build.gradle.kts b/distribution/server/build.gradle.kts index 7f489628adfb3..2ab146787c694 100644 --- a/distribution/server/build.gradle.kts +++ b/distribution/server/build.gradle.kts @@ -71,6 +71,9 @@ val distLib by configurations.creating { exclude(group = "com.google.android", module = "annotations") // Annotation libraries not needed at runtime exclude(group = "org.codehaus.mojo", module = "animal-sniffer-annotations") + // The full fastutil jar (~25MB) is replaced by :pulsar-broker-fastutil-minimized below, + // which ships only the fastutil classes actually used on the server (and client) side. + exclude(group = "it.unimi.dsi", module = "fastutil") } // Resolvable configurations for cross-project artifact dependencies. @@ -91,6 +94,9 @@ dependencies { // Version constraints from the enforced platform (inherited via implementation, // which distLib extends) ensure consistent versions without manual resolutionStrategy. distLib(project(":pulsar-broker")) + // Minimized fastutil (replaces the full fastutil jar excluded from distLib above): only the + // fastutil classes reachable from the broker and the bundled pulsar-client-original. + distLib(project(":pulsar-broker-fastutil-minimized")) distLib(project(":pulsar-metadata")) distLib(project(":pulsar-docs-tools")) distLib(project(":pulsar-proxy")) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 07c779abf65ab..b330e1cea86e6 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -262,7 +262,6 @@ The Apache Software License, Version 2.0 - com.fasterxml.jackson.module-jackson-module-parameter-names-2.21.3.jar * Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.4.jar * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar - * Fastutil -- it.unimi.dsi-fastutil-8.5.18.jar * LMAX Disruptor -- com.lmax-disruptor-4.0.0.jar * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.63.2.jar * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.6.jar diff --git a/pulsar-broker-fastutil-minimized/build.gradle.kts b/pulsar-broker-fastutil-minimized/build.gradle.kts new file mode 100644 index 0000000000000..53b37e2a83418 --- /dev/null +++ b/pulsar-broker-fastutil-minimized/build.gradle.kts @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Jar of just the fastutil classes reachable from the Pulsar server side. It replaces the +// full fastutil jar in the server distribution so the docker image / tarball ships only the +// classes actually used. The roots cover both the broker and (since it is bundled in the +// server distribution) the unrelocated pulsar-client-original, so this is a superset of +// :pulsar-client-fastutil-minimized. See pulsar.fastutil-minimized-conventions. + +plugins { + id("pulsar.fastutil-minimized-conventions") +} + +dependencies { + // Reachability roots: every Pulsar project that uses fastutil and ends up in the server + // distribution. minimize() keeps the union of fastutil classes reachable from these. + api(project(":pulsar-broker")) + api(project(":pulsar-client-original")) +} + +fastutilMinimized { + // The reachable set (broker + client usage) is ~818 classes; fail the build if it grows + // past this (bump it when new fastutil usage legitimately enlarges the set). + maxRetainedClasses.set(850) +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 243aae2be9f52..4b8c0b2850a24 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -193,6 +193,7 @@ include("pulsar-client-auth-sasl") // Tier 9 — shaded utility modules (in core-modules) include("pulsar-client-fastutil-minimized") +include("pulsar-broker-fastutil-minimized") // Tier 10 — shaded client modules (in core-modules) include("pulsar-client-shaded") From d1434a4e679839068f460af785e839788a9773f7 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 16:43:38 +0300 Subject: [PATCH 07/10] [improve][build] Publish pulsar-client-original with minimized fastutil in its POM/GMM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation Maven/Gradle consumers of the (non-shaded) pulsar-client-original currently pull the full ~25MB fastutil jar, even though the client only uses a few fastutil classes. Replacing that dependency with pulsar-client-fastutil-minimized in the published metadata lets consumers get just the classes the client needs. ### Modifications - Publish pulsar-client-fastutil-minimized (add pulsar.publish-conventions). This also makes it a published dependency, satisfying the public-java-library "published modules only depend on published modules" check for the shaded client modules that bundle it. The broker variant stays unpublished (only the server distribution consumes it). - In pulsar-client (pulsar-client-original), rewrite the PUBLISHED POM (pom.withXml) and Gradle Module Metadata (post-process the generated .module JSON) to replace it.unimi.dsi:fastutil with org.apache.pulsar:pulsar-client-fastutil-minimized. - This is publication-only: intra-build, pulsar-client-original keeps exposing full fastutil (pulsar-broker depends on it and uses more fastutil classes than the client minimized set), and there is no build-graph dependency on the minimized module — it is built only when the shaded jars (or its own publication) are, not on every client code change. Assisted-by: Claude Code --- .../build.gradle.kts | 4 ++ pulsar-client/build.gradle.kts | 67 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/pulsar-client-fastutil-minimized/build.gradle.kts b/pulsar-client-fastutil-minimized/build.gradle.kts index bee29c1b83110..02ef09a984d0b 100644 --- a/pulsar-client-fastutil-minimized/build.gradle.kts +++ b/pulsar-client-fastutil-minimized/build.gradle.kts @@ -23,6 +23,10 @@ plugins { id("pulsar.fastutil-minimized-conventions") + // Published to Maven so it can be referenced from pulsar-client-original's published metadata + // (where the full fastutil dependency is replaced by this minimized jar). The broker variant is + // not published — it is only consumed by the server distribution. + id("pulsar.publish-conventions") } dependencies { diff --git a/pulsar-client/build.gradle.kts b/pulsar-client/build.gradle.kts index da53aeba55d5a..5738c85a0c9f1 100644 --- a/pulsar-client/build.gradle.kts +++ b/pulsar-client/build.gradle.kts @@ -17,6 +17,8 @@ * under the License. */ +import org.gradle.api.publish.tasks.GenerateModuleMetadata + plugins { id("pulsar.public-java-library-conventions") alias(libs.plugins.protobuf) @@ -110,3 +112,68 @@ val generateTestAvro by tasks.registering(JavaExec::class) { sourceSets["test"].java.srcDir(generateTestAvro.map { layout.buildDirectory.dir("generated-sources/avro-test").get() }) tasks.named("compileTestJava") { dependsOn(generateTestAvro) } + +// --------------------------------------------------------------------------- +// Publish-time dependency replacement: in the PUBLISHED POM and Gradle Module Metadata only, +// replace the full fastutil dependency with the minimized :pulsar-client-fastutil-minimized jar, +// so Maven/Gradle consumers of pulsar-client-original pull in just the fastutil classes the client +// uses instead of the full ~25MB jar. +// +// This is publication-only on purpose: intra-build, pulsar-client-original keeps exposing the full +// fastutil (pulsar-broker depends on it and uses more fastutil classes than the client minimized +// set, so a build-scope swap would break the broker compile). There is deliberately no build-graph +// dependency on the minimized module either — it is built only when the shaded jars (or its own +// publication) are built, not every time a client class changes. +run { + val fromGroup = "it.unimi.dsi" + val fromName = "fastutil" + val toGroup = "org.apache.pulsar" + val toName = "pulsar-client-fastutil-minimized" + val toVersion = version.toString() + + // POM (Maven consumers): rewrite the fastutil block in place. + publishing.publications.withType().configureEach { + pom.withXml { + val sb = asString() + val replaced = sb.toString().replace( + Regex( + "\\s*\\Q$fromGroup\\E\\s*" + + "\\Q$fromName\\E.*?", + RegexOption.DOT_MATCHES_ALL + ), + "\n $toGroup\n" + + " $toName\n" + + " $toVersion\n" + + " runtime\n " + ) + sb.setLength(0) + sb.append(replaced) + } + } + + // Gradle Module Metadata (Gradle consumers): post-process the generated .module JSON, since GMM + // has no per-dependency rewrite API. Runs as the tail of the generate task so the published (and + // signed) file is the rewritten one. Captures only Providers/strings -> configuration-cache safe. + tasks.withType().configureEach { + val moduleFile = outputFile + doLast { + val file = moduleFile.get().asFile + @Suppress("UNCHECKED_CAST") + val json = groovy.json.JsonSlurper().parse(file) as MutableMap + val variants = json["variants"] as? List<*> ?: emptyList() + for (variant in variants) { + val deps = (variant as? Map<*, *>)?.get("dependencies") as? List<*> ?: continue + for (dep in deps) { + @Suppress("UNCHECKED_CAST") + val d = dep as? MutableMap ?: continue + if (d["group"] == fromGroup && d["module"] == fromName) { + d["group"] = toGroup + d["module"] = toName + d["version"] = linkedMapOf("requires" to toVersion) + } + } + } + file.writeText(groovy.json.JsonOutput.prettyPrint(groovy.json.JsonOutput.toJson(json))) + } + } +} From 422b2fd79cb8c12ef26fb28a75116f99cf2dea1f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 16:59:10 +0300 Subject: [PATCH 08/10] [improve][build] Make the minimization convention plugin generic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The convention is not specific to fastutil — it minimizes any configured library. Rename pulsar.fastutil-minimized-conventions to pulsar.minimized-dependencies-conventions and FastutilMinimizedExtension to MinimizedDependenciesExtension, expose it as `minimizedJar { }`, and drop the fastutil-specific default so each module lists its own `minimizedDependencies`. The fastutil modules now set `minimizedJar { minimizedDependencies.set(listOf("it.unimi.dsi:fastutil")); ... }`. Assisted-by: Claude Code --- ...mizedExtension.kt => MinimizedDependenciesExtension.kt} | 6 +++--- ...> pulsar.minimized-dependencies-conventions.gradle.kts} | 7 +++---- pulsar-broker-fastutil-minimized/build.gradle.kts | 7 ++++--- pulsar-client-fastutil-minimized/build.gradle.kts | 7 ++++--- 4 files changed, 14 insertions(+), 13 deletions(-) rename build-logic/conventions/src/main/kotlin/{FastutilMinimizedExtension.kt => MinimizedDependenciesExtension.kt} (87%) rename build-logic/conventions/src/main/kotlin/{pulsar.fastutil-minimized-conventions.gradle.kts => pulsar.minimized-dependencies-conventions.gradle.kts} (93%) diff --git a/build-logic/conventions/src/main/kotlin/FastutilMinimizedExtension.kt b/build-logic/conventions/src/main/kotlin/MinimizedDependenciesExtension.kt similarity index 87% rename from build-logic/conventions/src/main/kotlin/FastutilMinimizedExtension.kt rename to build-logic/conventions/src/main/kotlin/MinimizedDependenciesExtension.kt index a3d59077c083e..bade099dbd553 100644 --- a/build-logic/conventions/src/main/kotlin/FastutilMinimizedExtension.kt +++ b/build-logic/conventions/src/main/kotlin/MinimizedDependenciesExtension.kt @@ -21,14 +21,14 @@ import org.gradle.api.provider.ListProperty import org.gradle.api.provider.Property /** - * Configuration for the `pulsar.dependency-minimized-conventions` plugin. + * Configuration for the `pulsar.minimized-dependencies-conventions` plugin. * * A "minimized" packaging module declares its reachability roots as `api(project(...))` * dependencies and then only needs to express which libraries to minimize and the * expected upper bound on the retained class count. */ -interface FastutilMinimizedExtension { - /** Libraries to minimize, as `"group:name"` entries. Defaults to `it.unimi.dsi:fastutil`. */ +interface MinimizedDependenciesExtension { + /** Libraries to minimize, as `"group:name"` entries (e.g. `"it.unimi.dsi:fastutil"`). */ val minimizedDependencies: ListProperty /** diff --git a/build-logic/conventions/src/main/kotlin/pulsar.fastutil-minimized-conventions.gradle.kts b/build-logic/conventions/src/main/kotlin/pulsar.minimized-dependencies-conventions.gradle.kts similarity index 93% rename from build-logic/conventions/src/main/kotlin/pulsar.fastutil-minimized-conventions.gradle.kts rename to build-logic/conventions/src/main/kotlin/pulsar.minimized-dependencies-conventions.gradle.kts index 2f863972b3db1..9a91e6181ea9b 100644 --- a/build-logic/conventions/src/main/kotlin/pulsar.fastutil-minimized-conventions.gradle.kts +++ b/build-logic/conventions/src/main/kotlin/pulsar.minimized-dependencies-conventions.gradle.kts @@ -29,8 +29,8 @@ import java.util.zip.ZipFile // A consuming module: // * applies this plugin, // * declares its reachability roots as `api(project(...))` dependencies (e.g. the -// pulsar projects that use the library being minimized), and -// * sets `fastutilMinimized { maxRetainedClasses.set(N) }`. +// pulsar projects that use the libraries being minimized), and +// * configures `minimizedJar { minimizedDependencies.set(listOf("group:name")); maxRetainedClasses.set(N) }`. // // Why `api`: Shadow's minimize() seeds its reachability analysis (UnusedTracker) from // the project's own source classes plus its `api`-scoped jars. A packaging module has no @@ -42,8 +42,7 @@ plugins { id("pulsar.shadow-conventions") } -val minimized = extensions.create("fastutilMinimized") -minimized.minimizedDependencies.convention(listOf("it.unimi.dsi:fastutil")) +val minimized = extensions.create("minimizedJar") // The `api` roots are a build-only reachability seed; strip them (and everything else) // from the consumable variants so this module ships a self-contained jar with no diff --git a/pulsar-broker-fastutil-minimized/build.gradle.kts b/pulsar-broker-fastutil-minimized/build.gradle.kts index 53b37e2a83418..54dc1e5beacf4 100644 --- a/pulsar-broker-fastutil-minimized/build.gradle.kts +++ b/pulsar-broker-fastutil-minimized/build.gradle.kts @@ -21,10 +21,10 @@ // full fastutil jar in the server distribution so the docker image / tarball ships only the // classes actually used. The roots cover both the broker and (since it is bundled in the // server distribution) the unrelocated pulsar-client-original, so this is a superset of -// :pulsar-client-fastutil-minimized. See pulsar.fastutil-minimized-conventions. +// :pulsar-client-fastutil-minimized. See pulsar.minimized-dependencies-conventions. plugins { - id("pulsar.fastutil-minimized-conventions") + id("pulsar.minimized-dependencies-conventions") } dependencies { @@ -34,7 +34,8 @@ dependencies { api(project(":pulsar-client-original")) } -fastutilMinimized { +minimizedJar { + minimizedDependencies.set(listOf("it.unimi.dsi:fastutil")) // The reachable set (broker + client usage) is ~818 classes; fail the build if it grows // past this (bump it when new fastutil usage legitimately enlarges the set). maxRetainedClasses.set(850) diff --git a/pulsar-client-fastutil-minimized/build.gradle.kts b/pulsar-client-fastutil-minimized/build.gradle.kts index 02ef09a984d0b..4ad06c42e5c2f 100644 --- a/pulsar-client-fastutil-minimized/build.gradle.kts +++ b/pulsar-client-fastutil-minimized/build.gradle.kts @@ -19,10 +19,10 @@ // Jar of just the fastutil classes reachable from the Pulsar client. It is bundled (and // relocated) by :pulsar-client-shaded, :pulsar-client-all and :pulsar-client-admin-shaded -// so the full ~25MB fastutil jar is not shipped. See pulsar.fastutil-minimized-conventions. +// so the full ~25MB fastutil jar is not shipped. See pulsar.minimized-dependencies-conventions. plugins { - id("pulsar.fastutil-minimized-conventions") + id("pulsar.minimized-dependencies-conventions") // Published to Maven so it can be referenced from pulsar-client-original's published metadata // (where the full fastutil dependency is replaced by this minimized jar). The broker variant is // not published — it is only consumed by the server distribution. @@ -35,7 +35,8 @@ dependencies { api(project(":pulsar-client-original")) } -fastutilMinimized { +minimizedJar { + minimizedDependencies.set(listOf("it.unimi.dsi:fastutil")) // The reachable set is ~591 classes; fail the build if it grows past this (e.g. if // minimize() regresses and ships the full ~12,965-class jar). maxRetainedClasses.set(600) From ac398138616172d085dbc2eb59d1db1a3f30b7c8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 16:59:24 +0300 Subject: [PATCH 09/10] [fix][dist] Keep the fastutil attribution in the server LICENSE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The full fastutil jar was replaced in the server distribution by the minimized pulsar-broker-fastutil-minimized jar, so the previous "Fastutil -- ...jar" line (which named a no-longer-bundled jar) was removed. The fastutil classes (Apache-2.0) still ship, bundled inside that Pulsar jar, so restore a fastutil attribution as free text without a jar filename — accepted by checkBinaryLicense, which only validates entries that reference a concrete *.jar. Assisted-by: Claude Code --- distribution/server/src/assemble/LICENSE.bin.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index b330e1cea86e6..b6920d8bf92fd 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -262,6 +262,7 @@ The Apache Software License, Version 2.0 - com.fasterxml.jackson.module-jackson-module-parameter-names-2.21.3.jar * Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.4.jar * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar + * Fastutil -- it.unimi.dsi:fastutil (only the classes used by Pulsar, bundled within pulsar-broker-fastutil-minimized) * LMAX Disruptor -- com.lmax-disruptor-4.0.0.jar * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.63.2.jar * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.6.jar From 681670ab01baf8e1c2e307015c3837713a97c064 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 15 Jun 2026 18:45:07 +0300 Subject: [PATCH 10/10] [improve][build] Ship minimized fastutil in the shell distribution ### Motivation The server distribution was switched to the minimized fastutil jar, but the pulsar-shell CLI distribution still bundled the full ~24MB fastutil (pulled transitively via pulsar-client-tools), which is inconsistent and inflates the shell tarball / image. ### Modifications - distribution/shell: exclude the full it.unimi.dsi:fastutil from distLib and bundle pulsar-client-fastutil-minimized instead (the unrelocated minimized client set works for the shell's client-side modules). The shell binary LICENSE keeps a free-text fastutil attribution without a jar filename, matching the server distribution. - Broaden pulsar-client-fastutil-minimized's reachability roots to pulsar-client-tools and pulsar-client-admin-original in addition to pulsar-client-original, so the minimized set covers every fastutil class any client-side module reaches. There is no such usage today (the retained set stays 591 classes), but this future-proofs the set: if these modules start using fastutil, the needed classes are pulled in automatically. Verified the set is self-contained (0 closure gaps) and covers all 10 fastutil references in pulsar-client-original. Assisted-by: Claude Code --- distribution/shell/build.gradle.kts | 6 ++++++ distribution/shell/src/assemble/LICENSE.bin.txt | 2 +- pulsar-client-fastutil-minimized/build.gradle.kts | 10 ++++++++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/distribution/shell/build.gradle.kts b/distribution/shell/build.gradle.kts index 9aa6b3d18e2bb..05452ce0aa55b 100644 --- a/distribution/shell/build.gradle.kts +++ b/distribution/shell/build.gradle.kts @@ -42,9 +42,15 @@ val distLib by configurations.creating { exclude(group = "net.java.dev.jna", module = "jna-platform") exclude(group = "io.netty", module = "netty-transport-native-kqueue") exclude(group = "io.prometheus", module = "simpleclient_caffeine") + // The full fastutil jar (~24MB) is replaced by :pulsar-client-fastutil-minimized below, + // which ships only the fastutil classes the client-side modules actually use. + exclude(group = "it.unimi.dsi", module = "fastutil") } dependencies { distLib(project(":pulsar-client-tools")) + // Minimized fastutil (replaces the full fastutil jar excluded from distLib above): only the + // fastutil classes reachable from the client-side modules (client, client-tools, admin). + distLib(project(":pulsar-client-fastutil-minimized")) distLib(libs.log4j.core) distLib(libs.log4j.web) distLib(libs.log4j.layout.template.json) diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 4f000e2c92a8b..3b1bc4f8800b2 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -427,7 +427,7 @@ The Apache Software License, Version 2.0 * RE2j -- re2j-1.8.jar * Spotify completable-futures -- completable-futures-0.3.6.jar * RoaringBitmap -- RoaringBitmap-1.6.9.jar - * Fastutil -- fastutil-8.5.18.jar + * Fastutil -- it.unimi.dsi:fastutil (only the classes used by Pulsar, bundled within pulsar-client-fastutil-minimized) * JSpecify -- jspecify-1.0.0.jar * JetBrains Annotations -- annotations-26.1.0.jar diff --git a/pulsar-client-fastutil-minimized/build.gradle.kts b/pulsar-client-fastutil-minimized/build.gradle.kts index 4ad06c42e5c2f..4225bd63089ac 100644 --- a/pulsar-client-fastutil-minimized/build.gradle.kts +++ b/pulsar-client-fastutil-minimized/build.gradle.kts @@ -30,9 +30,15 @@ plugins { } dependencies { - // Reachability root: pulsar-client-original (its NegativeAcksTracker is the only direct - // fastutil consumer on the client side). minimize() seeds from its transitive closure. + // Reachability roots: the client-side modules that ship together (in the client shaded jars, + // the published pulsar-client-original, and the shell distribution). minimize() seeds from + // their transitive closures, so this jar contains every fastutil class any of them reach. + // Today only pulsar-client-original (NegativeAcksTracker) uses fastutil directly, but listing + // pulsar-client-tools and pulsar-client-admin-original as roots future-proofs the set: if they + // start using fastutil, the needed classes are pulled in automatically with no further wiring. api(project(":pulsar-client-original")) + api(project(":pulsar-client-tools")) + api(project(":pulsar-client-admin-original")) } minimizedJar {