diff --git a/build-logic/conventions/src/main/kotlin/MinimizedDependenciesExtension.kt b/build-logic/conventions/src/main/kotlin/MinimizedDependenciesExtension.kt new file mode 100644 index 0000000000000..bade099dbd553 --- /dev/null +++ b/build-logic/conventions/src/main/kotlin/MinimizedDependenciesExtension.kt @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.gradle.api.provider.ListProperty +import org.gradle.api.provider.Property + +/** + * Configuration for the `pulsar.minimized-dependencies-conventions` plugin. + * + * A "minimized" packaging module declares its reachability roots as `api(project(...))` + * dependencies and then only needs to express which libraries to minimize and the + * expected upper bound on the retained class count. + */ +interface MinimizedDependenciesExtension { + /** Libraries to minimize, as `"group:name"` entries (e.g. `"it.unimi.dsi:fastutil"`). */ + val minimizedDependencies: ListProperty + + /** + * Upper bound on the number of `.class` entries retained in the shaded jar. The build fails + * if the jar exceeds it — this catches a minimize() regression that would ship the full jar. + */ + val maxRetainedClasses: Property +} diff --git a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts index 60ed0cfa67cf3..d5e06b0913ce4 100644 --- a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts +++ b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts @@ -37,6 +37,7 @@ tasks.named("shadowJ include(project(":pulsar-client-admin-original")) include(project(":pulsar-common")) include(project(":pulsar-client-messagecrypto-bc")) + include(project(":pulsar-client-fastutil-minimized")) include(dependency("com.fasterxml.jackson.*:.*")) include(dependency("com.google.*:.*")) include(dependency("com.google.auth:.*")) @@ -143,6 +144,7 @@ tasks.named("shadowJ relocateWithPrefix(shadePrefix, "io.opencensus") relocateWithPrefix(shadePrefix, "io.prometheus.client") relocateWithPrefix(shadePrefix, "io.swagger") + relocateWithPrefix(shadePrefix, "it.unimi.dsi.fastutil") relocateWithPrefix(shadePrefix, "javassist") relocateWithPrefix(shadePrefix, "jakarta.activation") relocateWithPrefix(shadePrefix, "jakarta.annotation") diff --git a/build-logic/conventions/src/main/kotlin/pulsar.minimized-dependencies-conventions.gradle.kts b/build-logic/conventions/src/main/kotlin/pulsar.minimized-dependencies-conventions.gradle.kts new file mode 100644 index 0000000000000..9a91e6181ea9b --- /dev/null +++ b/build-logic/conventions/src/main/kotlin/pulsar.minimized-dependencies-conventions.gradle.kts @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Imported explicitly: the `java` plugin contributes a `java { }` extension accessor, +// so an unqualified `java.util.zip.ZipFile` would resolve `java` to that extension. +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar +import java.util.zip.ZipFile + +// Convention for " minimized" packaging modules. Produces a shadow jar that +// contains only the classes of the minimized libraries that are actually reachable from +// the module's reachability roots and their transitive closure. +// +// A consuming module: +// * applies this plugin, +// * declares its reachability roots as `api(project(...))` dependencies (e.g. the +// pulsar projects that use the libraries being minimized), and +// * configures `minimizedJar { minimizedDependencies.set(listOf("group:name")); maxRetainedClasses.set(N) }`. +// +// Why `api`: Shadow's minimize() seeds its reachability analysis (UnusedTracker) from +// the project's own source classes plus its `api`-scoped jars. A packaging module has no +// source, so the `api` roots are what drive the analysis; with `implementation` (or no +// roots) minimize() has nothing to start from and keeps the whole jar. + +plugins { + id("pulsar.java-conventions") + id("pulsar.shadow-conventions") +} + +val minimized = extensions.create("minimizedJar") + +// The `api` roots are a build-only reachability seed; strip them (and everything else) +// from the consumable variants so this module ships a self-contained jar with no +// transitive dependencies on consumers' classpaths. +listOf("apiElements", "runtimeElements").forEach { variant -> + configurations.named(variant) { + setExtendsFrom(emptySet()) + } +} + +tasks.named("shadowJar") { + val minimizedDeps = minimized.minimizedDependencies + inputs.property("minimizedDependencies", minimizedDeps) + // Bundle ONLY the minimized libraries; the `api` roots are read by minimize() as + // reachability roots but are not part of the output jar. + dependencies { + minimizedDeps.get().forEach { coords -> include(dependency("$coords:.*")) } + } + // Drop every bundled class not reachable from the reachability roots. + minimize() +} + +// Verify the jar was actually pruned: the reachable set is small, so a count well above +// it but far below the full library jar catches a minimize() regression (e.g. the no-op +// that ships the whole jar). Stays configuration-cache compatible — the action captures +// only Providers, never the Project. +val verifyMinimizedJar = tasks.register("verifyMinimizedJar") { + val jarFile = tasks.named("shadowJar").flatMap { it.archiveFile } + val maxClasses = minimized.maxRetainedClasses + inputs.file(jarFile) + inputs.property("maxRetainedClasses", maxClasses) + doLast { + val limit = maxClasses.get() + val classCount = ZipFile(jarFile.get().asFile).use { zf -> + zf.entries().asSequence().count { it.name.endsWith(".class") } + } + if (classCount > limit) { + throw GradleException( + "Minimized jar retained $classCount classes (> $limit) — minimize() is not pruning." + ) + } + logger.lifecycle("Minimized jar OK: $classCount classes retained (limit $limit).") + } +} + +tasks.named("check") { + dependsOn(verifyMinimizedJar) +} diff --git a/distribution/server/build.gradle.kts b/distribution/server/build.gradle.kts index 7f489628adfb3..2ab146787c694 100644 --- a/distribution/server/build.gradle.kts +++ b/distribution/server/build.gradle.kts @@ -71,6 +71,9 @@ val distLib by configurations.creating { exclude(group = "com.google.android", module = "annotations") // Annotation libraries not needed at runtime exclude(group = "org.codehaus.mojo", module = "animal-sniffer-annotations") + // The full fastutil jar (~25MB) is replaced by :pulsar-broker-fastutil-minimized below, + // which ships only the fastutil classes actually used on the server (and client) side. + exclude(group = "it.unimi.dsi", module = "fastutil") } // Resolvable configurations for cross-project artifact dependencies. @@ -91,6 +94,9 @@ dependencies { // Version constraints from the enforced platform (inherited via implementation, // which distLib extends) ensure consistent versions without manual resolutionStrategy. distLib(project(":pulsar-broker")) + // Minimized fastutil (replaces the full fastutil jar excluded from distLib above): only the + // fastutil classes reachable from the broker and the bundled pulsar-client-original. + distLib(project(":pulsar-broker-fastutil-minimized")) distLib(project(":pulsar-metadata")) distLib(project(":pulsar-docs-tools")) distLib(project(":pulsar-proxy")) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index b330e1cea86e6..b6920d8bf92fd 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -262,6 +262,7 @@ The Apache Software License, Version 2.0 - com.fasterxml.jackson.module-jackson-module-parameter-names-2.21.3.jar * Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.4.jar * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar + * Fastutil -- it.unimi.dsi:fastutil (only the classes used by Pulsar, bundled within pulsar-broker-fastutil-minimized) * LMAX Disruptor -- com.lmax-disruptor-4.0.0.jar * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.63.2.jar * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.6.jar diff --git a/distribution/shell/build.gradle.kts b/distribution/shell/build.gradle.kts index 9aa6b3d18e2bb..05452ce0aa55b 100644 --- a/distribution/shell/build.gradle.kts +++ b/distribution/shell/build.gradle.kts @@ -42,9 +42,15 @@ val distLib by configurations.creating { exclude(group = "net.java.dev.jna", module = "jna-platform") exclude(group = "io.netty", module = "netty-transport-native-kqueue") exclude(group = "io.prometheus", module = "simpleclient_caffeine") + // The full fastutil jar (~24MB) is replaced by :pulsar-client-fastutil-minimized below, + // which ships only the fastutil classes the client-side modules actually use. + exclude(group = "it.unimi.dsi", module = "fastutil") } dependencies { distLib(project(":pulsar-client-tools")) + // Minimized fastutil (replaces the full fastutil jar excluded from distLib above): only the + // fastutil classes reachable from the client-side modules (client, client-tools, admin). + distLib(project(":pulsar-client-fastutil-minimized")) distLib(libs.log4j.core) distLib(libs.log4j.web) distLib(libs.log4j.layout.template.json) diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 115851264bfdb..3b1bc4f8800b2 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -427,6 +427,7 @@ The Apache Software License, Version 2.0 * RE2j -- re2j-1.8.jar * Spotify completable-futures -- completable-futures-0.3.6.jar * RoaringBitmap -- RoaringBitmap-1.6.9.jar + * Fastutil -- it.unimi.dsi:fastutil (only the classes used by Pulsar, bundled within pulsar-client-fastutil-minimized) * JSpecify -- jspecify-1.0.0.jar * JetBrains Annotations -- annotations-26.1.0.jar diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e0c801ed65452..ae846de9b6bf9 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -80,6 +80,7 @@ opentelemetry-gcp-resources = "1.57.0-alpha" # Data structures / Utils guava = "33.6.0-jre" caffeine = "3.2.4" +fastutil = "8.5.18" jctools = "4.0.6" roaringbitmap = "1.6.9" hppc = "0.9.1" @@ -175,7 +176,7 @@ thrift = "0.23.0" datasketches-memory = "4.1.0" datasketches-java = "7.0.1" # Shading -shadow = "9.4.1" +shadow = "9.4.2" [libraries] # SLF4J @@ -330,6 +331,7 @@ rocksdbjni = { module = "org.rocksdb:rocksdbjni", version.ref = "rocksdb" } error-prone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone" } # Data structures caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "caffeine" } +fastutil = { module = "it.unimi.dsi:fastutil", version.ref = "fastutil" } jctools-core = { module = "org.jctools:jctools-core", version.ref = "jctools" } jctools-core-jdk11 = { module = "org.jctools:jctools-core-jdk11", version.ref = "jctools" } roaringbitmap = { module = "org.roaringbitmap:RoaringBitmap", version.ref = "roaringbitmap" } diff --git a/pulsar-broker-fastutil-minimized/build.gradle.kts b/pulsar-broker-fastutil-minimized/build.gradle.kts new file mode 100644 index 0000000000000..54dc1e5beacf4 --- /dev/null +++ b/pulsar-broker-fastutil-minimized/build.gradle.kts @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Jar of just the fastutil classes reachable from the Pulsar server side. It replaces the +// full fastutil jar in the server distribution so the docker image / tarball ships only the +// classes actually used. The roots cover both the broker and (since it is bundled in the +// server distribution) the unrelocated pulsar-client-original, so this is a superset of +// :pulsar-client-fastutil-minimized. See pulsar.minimized-dependencies-conventions. + +plugins { + id("pulsar.minimized-dependencies-conventions") +} + +dependencies { + // Reachability roots: every Pulsar project that uses fastutil and ends up in the server + // distribution. minimize() keeps the union of fastutil classes reachable from these. + api(project(":pulsar-broker")) + api(project(":pulsar-client-original")) +} + +minimizedJar { + minimizedDependencies.set(listOf("it.unimi.dsi:fastutil")) + // The reachable set (broker + client usage) is ~818 classes; fail the build if it grows + // past this (bump it when new fastutil usage legitimately enlarges the set). + maxRetainedClasses.set(850) +} diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts index 6fd3778c0c388..c177133647880 100644 --- a/pulsar-broker/build.gradle.kts +++ b/pulsar-broker/build.gradle.kts @@ -49,6 +49,7 @@ dependencies { api(libs.commons.lang3) api(libs.netty.transport) implementation(libs.protobuf.java) + implementation(libs.fastutil) implementation(libs.curator.recipes) implementation(libs.bookkeeper.stream.storage.server) { exclude(group = "org.apache.bookkeeper") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 8e9b8343704ea..c2e1c63b40020 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -21,9 +21,14 @@ import com.google.common.annotations.VisibleForTesting; import io.github.merlimat.slog.Logger; import io.netty.util.Timer; +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; import java.time.Clock; import java.util.NavigableSet; -import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -31,7 +36,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; -import org.apache.pulsar.common.util.collections.LongOpenHashSet; import org.roaringbitmap.longlong.Roaring64Bitmap; public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { @@ -40,9 +44,9 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack protected final Logger log; // timestamp -> ledgerId -> entryId - // TreeMap -> TreeMap -> RoaringBitmap - protected final TreeMap> - delayedMessageMap = new TreeMap<>(); + // AVL tree -> OpenHashMap -> RoaringBitmap + protected final Long2ObjectSortedMap> + delayedMessageMap = new Long2ObjectAVLTreeMap<>(); // If we detect that all messages have fixed delay time, such that the delivery is // always going to be in FIFO order, then we can avoid pulling all the messages in @@ -137,7 +141,7 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { .log("Add message"); long timestamp = roundTimestamp(deliverAt); - Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>()) + Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); // Roaring64Bitmap does not store duplicates, so track if it a new element // so we can keep delayedMessagesCount in sync @@ -194,7 +198,7 @@ private void checkAndUpdateHighest(long deliverAt) { @Override public boolean hasMessageAvailable() { boolean hasMessageAvailable = !delayedMessageMap.isEmpty() - && delayedMessageMap.firstKey() <= getCutoffTime(); + && delayedMessageMap.firstLongKey() <= getCutoffTime(); if (!hasMessageAvailable) { updateTimer(); } @@ -211,15 +215,15 @@ public NavigableSet getScheduledMessages(int maxMessages) { long cutoffTime = getCutoffTime(); while (n > 0 && !delayedMessageMap.isEmpty()) { - long timestamp = delayedMessageMap.firstKey(); + long timestamp = delayedMessageMap.firstLongKey(); if (timestamp > cutoffTime) { break; } - LongOpenHashSet ledgerIdToDelete = new LongOpenHashSet(); - TreeMap ledgerMap = delayedMessageMap.get(timestamp); - for (var ledgerEntry : ledgerMap.entrySet()) { - long ledgerId = ledgerEntry.getKey(); + LongSet ledgerIdToDelete = new LongOpenHashSet(); + Long2ObjectSortedMap ledgerMap = delayedMessageMap.get(timestamp); + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); Roaring64Bitmap entryIds = ledgerEntry.getValue(); long cardinality = entryIds.getLongCardinality(); if (cardinality <= n) { @@ -309,6 +313,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead } protected long nextDeliveryTime() { - return delayedMessageMap.firstKey(); + return delayedMessageMap.firstLongKey(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0f4708fa39fe7..0f8f313c6a298 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -28,6 +28,8 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.opentelemetry.api.common.Attributes; +import it.unimi.dsi.fastutil.ints.IntIntPair; +import it.unimi.dsi.fastutil.objects.ObjectIntPair; import java.time.Instant; import java.util.ArrayList; import java.util.BitSet; @@ -72,8 +74,6 @@ import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.IntIntPair; -import org.apache.pulsar.common.util.collections.ObjectIntPair; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.common.exception.TransactionConflictException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 9edc88f88cf9e..5393cf4d6a6a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,7 +33,6 @@ import org.apache.pulsar.common.policies.data.DrainingHash; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl; -import org.apache.pulsar.common.util.collections.Int2ObjectOpenHashMap; import org.roaringbitmap.RoaringBitmap; /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java index 0d684eb17361b..669562055214c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java @@ -18,13 +18,13 @@ */ package org.apache.pulsar.broker.service; +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import java.util.List; import java.util.concurrent.locks.StampedLock; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.common.util.collections.Long2IntMap; -import org.apache.pulsar.common.util.collections.Long2IntOpenHashMap; -import org.apache.pulsar.common.util.collections.Long2ObjectMap; -import org.apache.pulsar.common.util.collections.Long2ObjectOpenHashMap; public class InMemoryRedeliveryTracker implements RedeliveryTracker { // ledgerId -> entryId -> count diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 48d912d174c9e..9eb43efbd46db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -18,14 +18,15 @@ */ package org.apache.pulsar.broker.service; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; +import it.unimi.dsi.fastutil.ints.IntIntPair; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.objects.ObjectBidirectionalIterator; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; -import org.apache.pulsar.common.util.collections.IntIntPair; /** * A thread-safe map to store pending acks in the consumer. @@ -97,7 +98,7 @@ public interface PendingAcksConsumer { } private final Consumer consumer; - private final TreeMap> pendingAcks; + private final Long2ObjectSortedMap> pendingAcks; private final Supplier pendingAcksAddHandlerSupplier; private final Supplier pendingAcksRemoveHandlerSupplier; private final Lock readLock; @@ -108,7 +109,7 @@ public interface PendingAcksConsumer { PendingAcksMap(Consumer consumer, Supplier pendingAcksAddHandlerSupplier, Supplier pendingAcksRemoveHandlerSupplier) { this.consumer = consumer; - this.pendingAcks = new TreeMap<>(); + this.pendingAcks = new Long2ObjectRBTreeMap<>(); this.pendingAcksAddHandlerSupplier = pendingAcksAddHandlerSupplier; this.pendingAcksRemoveHandlerSupplier = pendingAcksRemoveHandlerSupplier; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -144,8 +145,8 @@ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remaining && !pendingAcksAddHandler.handleAdding(consumer, ledgerId, entryId, stickyKeyHash)) { return false; } - TreeMap ledgerPendingAcks = - pendingAcks.computeIfAbsent(ledgerId, k -> new TreeMap<>()); + Long2ObjectSortedMap ledgerPendingAcks = + pendingAcks.computeIfAbsent(ledgerId, k -> new Long2ObjectRBTreeMap<>()); IntIntPair previous = ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, stickyKeyHash)); if (previous == null) { size++; @@ -183,12 +184,12 @@ public void forEach(PendingAcksConsumer processor) { private void processPendingAcks(PendingAcksConsumer processor) { // this code uses for loops intentionally, don't refactor to use forEach // iterate the outer map - for (Map.Entry> entry : pendingAcks.entrySet()) { - long ledgerId = entry.getKey(); - TreeMap ledgerPendingAcks = entry.getValue(); + for (Long2ObjectMap.Entry> entry : pendingAcks.long2ObjectEntrySet()) { + long ledgerId = entry.getLongKey(); + Long2ObjectSortedMap ledgerPendingAcks = entry.getValue(); // iterate the inner map - for (Map.Entry e : ledgerPendingAcks.entrySet()) { - long entryId = e.getKey(); + for (Long2ObjectMap.Entry e : ledgerPendingAcks.long2ObjectEntrySet()) { + long entryId = e.getLongKey(); IntIntPair batchSizeAndStickyKeyHash = e.getValue(); processor.accept(ledgerId, entryId, batchSizeAndStickyKeyHash.leftInt(), batchSizeAndStickyKeyHash.rightInt()); @@ -254,7 +255,7 @@ private void internalForEachAndClear(PendingAcksConsumer processor, boolean clos public boolean contains(long ledgerId, long entryId) { try { readLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -274,7 +275,7 @@ public boolean contains(long ledgerId, long entryId) { public IntIntPair get(long ledgerId, long entryId) { try { readLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } @@ -296,7 +297,7 @@ public IntIntPair get(long ledgerId, long entryId) { public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyHash) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -326,7 +327,7 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -352,7 +353,7 @@ public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelt public boolean remove(long ledgerId, long entryId) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return false; } @@ -384,7 +385,7 @@ public boolean remove(long ledgerId, long entryId) { public IntIntPair removeAndGet(long ledgerId, long entryId) { try { writeLock.lock(); - TreeMap ledgerMap = pendingAcks.get(ledgerId); + Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { return null; } @@ -443,23 +444,23 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry } else { readLock.lock(); } - Iterator>> ledgerMapIterator = - pendingAcks.headMap(markDeleteLedgerId + 1).entrySet().iterator(); + ObjectBidirectionalIterator>> ledgerMapIterator = + pendingAcks.headMap(markDeleteLedgerId + 1).long2ObjectEntrySet().iterator(); while (ledgerMapIterator.hasNext()) { - Map.Entry> entry = ledgerMapIterator.next(); - long ledgerId = entry.getKey(); - TreeMap ledgerMap = entry.getValue(); - TreeMap ledgerMapHead; + Long2ObjectMap.Entry> entry = ledgerMapIterator.next(); + long ledgerId = entry.getLongKey(); + Long2ObjectSortedMap ledgerMap = entry.getValue(); + Long2ObjectSortedMap ledgerMapHead; if (ledgerId == markDeleteLedgerId) { - ledgerMapHead = new TreeMap<>(ledgerMap.headMap(markDeleteEntryId + 1)); + ledgerMapHead = ledgerMap.headMap(markDeleteEntryId + 1); } else { ledgerMapHead = ledgerMap; } - Iterator> entryMapIterator = - ledgerMapHead.entrySet().iterator(); + ObjectBidirectionalIterator> entryMapIterator = + ledgerMapHead.long2ObjectEntrySet().iterator(); while (entryMapIterator.hasNext()) { - Map.Entry intIntPairEntry = entryMapIterator.next(); - long entryId = intIntPairEntry.getKey(); + Long2ObjectMap.Entry intIntPairEntry = entryMapIterator.next(); + long entryId = intIntPairEntry.getLongKey(); if (!acquiredWriteLock) { retryWithWriteLock = true; return; @@ -478,10 +479,6 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry removedEntryCallback.accept(ledgerId, entryId, batchSize, stickyKeyHash); } entryMapIterator.remove(); - // also remove from the original map if we're iterating a copy - if (ledgerId == markDeleteLedgerId) { - ledgerMap.remove(entryId); - } size--; } if (ledgerMap.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 02055b28f3b53..7b7b6596f0ca2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; import com.google.common.annotations.VisibleForTesting; import io.github.merlimat.slog.Logger; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -58,7 +59,6 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.IntOpenHashSet; public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers implements StickyKeyDispatcher { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index f8a9dd8304294..1c16ed4228f33 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -272,7 +272,7 @@ public void run(Timeout timeout) throws Exception { return; } try { - this.delayedMessageMap.firstKey(); + this.delayedMessageMap.firstLongKey(); } catch (Exception e) { e.printStackTrace(); exceptions[0] = e; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 8c033d4f731b2..c13babb126a30 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -29,9 +29,9 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import it.unimi.dsi.fastutil.ints.IntIntPair; import java.util.ArrayList; import java.util.List; -import org.apache.pulsar.common.util.collections.IntIntPair; import org.testng.annotations.Test; public class PendingAcksMapTest { diff --git a/pulsar-client-admin-shaded/build.gradle.kts b/pulsar-client-admin-shaded/build.gradle.kts index 8fe0120d3a264..fbe36d769c2b2 100644 --- a/pulsar-client-admin-shaded/build.gradle.kts +++ b/pulsar-client-admin-shaded/build.gradle.kts @@ -24,7 +24,12 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): - implementation(project(":pulsar-client-admin-original")) + implementation(project(":pulsar-client-admin-original")) { + // fastutil is bundled via :pulsar-client-fastutil-minimized, which packs only the + // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. + exclude(group = "it.unimi.dsi", module = "fastutil") + } + implementation(project(":pulsar-client-fastutil-minimized")) implementation(project(":pulsar-client-messagecrypto-bc")) // Non-bundled runtime dependencies for the dependency-reduced published POM/GMM (the `shadow` diff --git a/pulsar-client-all/build.gradle.kts b/pulsar-client-all/build.gradle.kts index 7c49a7d5465ae..6f7406d266ad4 100644 --- a/pulsar-client-all/build.gradle.kts +++ b/pulsar-client-all/build.gradle.kts @@ -24,7 +24,12 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): - implementation(project(":pulsar-client-original")) + implementation(project(":pulsar-client-original")) { + // fastutil is bundled via :pulsar-client-fastutil-minimized, which packs only the + // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. + exclude(group = "it.unimi.dsi", module = "fastutil") + } + implementation(project(":pulsar-client-fastutil-minimized")) implementation(project(":pulsar-client-admin-original")) implementation(project(":pulsar-client-messagecrypto-bc")) diff --git a/pulsar-client-fastutil-minimized/build.gradle.kts b/pulsar-client-fastutil-minimized/build.gradle.kts new file mode 100644 index 0000000000000..4225bd63089ac --- /dev/null +++ b/pulsar-client-fastutil-minimized/build.gradle.kts @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Jar of just the fastutil classes reachable from the Pulsar client. It is bundled (and +// relocated) by :pulsar-client-shaded, :pulsar-client-all and :pulsar-client-admin-shaded +// so the full ~25MB fastutil jar is not shipped. See pulsar.minimized-dependencies-conventions. + +plugins { + id("pulsar.minimized-dependencies-conventions") + // Published to Maven so it can be referenced from pulsar-client-original's published metadata + // (where the full fastutil dependency is replaced by this minimized jar). The broker variant is + // not published — it is only consumed by the server distribution. + id("pulsar.publish-conventions") +} + +dependencies { + // Reachability roots: the client-side modules that ship together (in the client shaded jars, + // the published pulsar-client-original, and the shell distribution). minimize() seeds from + // their transitive closures, so this jar contains every fastutil class any of them reach. + // Today only pulsar-client-original (NegativeAcksTracker) uses fastutil directly, but listing + // pulsar-client-tools and pulsar-client-admin-original as roots future-proofs the set: if they + // start using fastutil, the needed classes are pulled in automatically with no further wiring. + api(project(":pulsar-client-original")) + api(project(":pulsar-client-tools")) + api(project(":pulsar-client-admin-original")) +} + +minimizedJar { + minimizedDependencies.set(listOf("it.unimi.dsi:fastutil")) + // The reachable set is ~591 classes; fail the build if it grows past this (e.g. if + // minimize() regresses and ships the full ~12,965-class jar). + maxRetainedClasses.set(600) +} diff --git a/pulsar-client-shaded/build.gradle.kts b/pulsar-client-shaded/build.gradle.kts index 654e50f7273c5..74399ac50d41f 100644 --- a/pulsar-client-shaded/build.gradle.kts +++ b/pulsar-client-shaded/build.gradle.kts @@ -24,7 +24,12 @@ plugins { dependencies { // Bundled into the shaded jar (kept on the runtime classpath so shadowJar packs them): - implementation(project(":pulsar-client-original")) + implementation(project(":pulsar-client-original")) { + // fastutil is bundled via :pulsar-client-fastutil-minimized, which packs only the + // (unrelocated) fastutil classes actually used instead of the full ~25MB jar. + exclude(group = "it.unimi.dsi", module = "fastutil") + } + implementation(project(":pulsar-client-fastutil-minimized")) implementation(project(":pulsar-client-messagecrypto-bc")) // Non-bundled runtime dependencies. These are the ONLY entries in the dependency-reduced diff --git a/pulsar-client/build.gradle.kts b/pulsar-client/build.gradle.kts index c6c623a9b82de..5738c85a0c9f1 100644 --- a/pulsar-client/build.gradle.kts +++ b/pulsar-client/build.gradle.kts @@ -17,6 +17,8 @@ * under the License. */ +import org.gradle.api.publish.tasks.GenerateModuleMetadata + plugins { id("pulsar.public-java-library-conventions") alias(libs.plugins.protobuf) @@ -61,6 +63,7 @@ dependencies { implementation(libs.jsr305) api(libs.jspecify) implementation(libs.roaringbitmap) + implementation(libs.fastutil) compileOnly(libs.swagger.annotations) compileOnly(libs.protobuf.java) @@ -109,3 +112,68 @@ val generateTestAvro by tasks.registering(JavaExec::class) { sourceSets["test"].java.srcDir(generateTestAvro.map { layout.buildDirectory.dir("generated-sources/avro-test").get() }) tasks.named("compileTestJava") { dependsOn(generateTestAvro) } + +// --------------------------------------------------------------------------- +// Publish-time dependency replacement: in the PUBLISHED POM and Gradle Module Metadata only, +// replace the full fastutil dependency with the minimized :pulsar-client-fastutil-minimized jar, +// so Maven/Gradle consumers of pulsar-client-original pull in just the fastutil classes the client +// uses instead of the full ~25MB jar. +// +// This is publication-only on purpose: intra-build, pulsar-client-original keeps exposing the full +// fastutil (pulsar-broker depends on it and uses more fastutil classes than the client minimized +// set, so a build-scope swap would break the broker compile). There is deliberately no build-graph +// dependency on the minimized module either — it is built only when the shaded jars (or its own +// publication) are built, not every time a client class changes. +run { + val fromGroup = "it.unimi.dsi" + val fromName = "fastutil" + val toGroup = "org.apache.pulsar" + val toName = "pulsar-client-fastutil-minimized" + val toVersion = version.toString() + + // POM (Maven consumers): rewrite the fastutil block in place. + publishing.publications.withType().configureEach { + pom.withXml { + val sb = asString() + val replaced = sb.toString().replace( + Regex( + "\\s*\\Q$fromGroup\\E\\s*" + + "\\Q$fromName\\E.*?", + RegexOption.DOT_MATCHES_ALL + ), + "\n $toGroup\n" + + " $toName\n" + + " $toVersion\n" + + " runtime\n " + ) + sb.setLength(0) + sb.append(replaced) + } + } + + // Gradle Module Metadata (Gradle consumers): post-process the generated .module JSON, since GMM + // has no per-dependency rewrite API. Runs as the tail of the generate task so the published (and + // signed) file is the rewritten one. Captures only Providers/strings -> configuration-cache safe. + tasks.withType().configureEach { + val moduleFile = outputFile + doLast { + val file = moduleFile.get().asFile + @Suppress("UNCHECKED_CAST") + val json = groovy.json.JsonSlurper().parse(file) as MutableMap + val variants = json["variants"] as? List<*> ?: emptyList() + for (variant in variants) { + val deps = (variant as? Map<*, *>)?.get("dependencies") as? List<*> ?: continue + for (dep in deps) { + @Suppress("UNCHECKED_CAST") + val d = dep as? MutableMap ?: continue + if (d["group"] == fromGroup && d["module"] == fromName) { + d["group"] = toGroup + d["module"] = toName + d["version"] = linkedMapOf("requires" to toVersion) + } + } + } + file.writeText(groovy.json.JsonOutput.prettyPrint(groovy.json.JsonOutput.toJson(json))) + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 7bbf58a13672d..1d42d947bdd56 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -23,11 +23,14 @@ import io.netty.util.Timeout; import io.netty.util.Timer; import io.opentelemetry.api.trace.Span; +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator; import java.io.Closeable; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.TimeUnit; import lombok.CustomLog; import org.apache.pulsar.client.api.Message; @@ -36,8 +39,6 @@ import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.api.TraceableMessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.util.collections.Long2ObjectMap; -import org.apache.pulsar.common.util.collections.Long2ObjectOpenHashMap; import org.roaringbitmap.longlong.Roaring64Bitmap; @CustomLog @@ -45,8 +46,8 @@ class NegativeAcksTracker implements Closeable { // timestamp -> ledgerId -> entryId, no need to batch index, if different messages have // different timestamp, there will be multiple entries in the map - // TreeMap -> LongOpenHashMap -> Roaring64Bitmap - private TreeMap> nackedMessages = null; + // RB Tree -> LongOpenHashMap -> Roaring64Bitmap + private Long2ObjectSortedMap> nackedMessages = null; private final Long2ObjectMap> nackedMessageIds = new Long2ObjectOpenHashMap<>(); private final ConsumerBase consumer; @@ -86,7 +87,9 @@ private void triggerRedelivery(Timeout t) { } Long2ObjectMap ledgerMap = nackedMessages.get(timestamp); - ledgerMap.forEach((ledgerId, entrySet) -> { + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entrySet = ledgerEntry.getValue(); entrySet.forEach(entryId -> { MessageId msgId = null; Long2ObjectMap entryMap = nackedMessageIds.get(ledgerId); @@ -102,13 +105,13 @@ private void triggerRedelivery(Timeout t) { addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); messagesToRedeliver.add(msgId); }); - }); + } } // remove entries from the nackedMessages map - Iterator iterator = nackedMessages.keySet().iterator(); + LongBidirectionalIterator iterator = nackedMessages.keySet().iterator(); while (iterator.hasNext()) { - long timestamp = iterator.next(); + long timestamp = iterator.nextLong(); if (timestamp <= currentTimestamp) { iterator.remove(); } else { @@ -118,7 +121,7 @@ private void triggerRedelivery(Timeout t) { // Schedule the next redelivery if there are still messages to redeliver if (!nackedMessages.isEmpty()) { - long nextTriggerTimestamp = nackedMessages.firstKey(); + long nextTriggerTimestamp = nackedMessages.firstLongKey(); long delayMs = Math.max(nextTriggerTimestamp - currentTimestamp, 0); if (delayMs > 0) { this.timeout = timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS); @@ -164,7 +167,7 @@ private synchronized void add(MessageId messageId, int redeliveryCount) { } if (nackedMessages == null) { - nackedMessages = new TreeMap<>(); + nackedMessages = new Long2ObjectAVLTreeMap<>(); } long backoffMs; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java deleted file mode 100644 index ada501319d7dc..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * Open-addressing hash map with primitive int keys and object values. - * Uses linear probing and fibonacci hashing. - * Not thread-safe. - */ -@SuppressWarnings("unchecked") -public class Int2ObjectOpenHashMap { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private int[] keys; - private Object[] values; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - public Int2ObjectOpenHashMap() { - this(MIN_CAPACITY); - } - - public Int2ObjectOpenHashMap(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new int[cap]; - values = new Object[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - public V get(int key) { - int idx = indexOf(key); - return idx >= 0 ? (V) values[idx] : null; - } - - public V put(int key, V value) { - int idx = indexOf(key); - if (idx >= 0) { - V old = (V) values[idx]; - values[idx] = value; - return old; - } - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - return null; - } - - public V remove(int key) { - int idx = indexOf(key); - if (idx < 0) { - return null; - } - V old = (V) values[idx]; - removeAt(idx); - return old; - } - - /** - * Remove the entry only if it maps to the given value (by reference equality). - * - * @return true if the entry was removed - */ - public boolean remove(int key, Object value) { - int idx = indexOf(key); - if (idx < 0 || values[idx] != value) { - return false; - } - removeAt(idx); - return true; - } - - public boolean isEmpty() { - return size == 0; - } - - public int size() { - return size; - } - - public void clear() { - if (size > 0) { - java.util.Arrays.fill(used, false); - java.util.Arrays.fill(values, null); - size = 0; - } - } - - private int indexOf(int key) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (true) { - if (!used[idx]) { - return -1; - } - if (keys[idx] == key) { - return idx; - } - idx = (idx + 1) & mask; - } - } - - private void insertNew(int key, V value) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (used[idx]) { - idx = (idx + 1) & mask; - } - keys[idx] = key; - values[idx] = value; - used[idx] = true; - size++; - } - - private void removeAt(int idx) { - int mask = capacity - 1; - values[idx] = null; - size--; - int next = (idx + 1) & mask; - while (used[next]) { - int naturalSlot = hash(keys[next]) & mask; - if ((next > idx && (naturalSlot <= idx || naturalSlot > next)) - || (next < idx && (naturalSlot <= idx && naturalSlot > next))) { - keys[idx] = keys[next]; - values[idx] = values[next]; - values[next] = null; - idx = next; - } - next = (next + 1) & mask; - } - used[idx] = false; - } - - private void rehash(int newCapacity) { - int[] oldKeys = keys; - Object[] oldValues = values; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new int[newCapacity]; - values = new Object[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - insertNew(oldKeys[i], (V) oldValues[i]); - } - } - } - - /** - * Fibonacci hashing for int keys. - */ - private static int hash(int key) { - int h = key * 0x9E3779B9; - return h ^ (h >>> 16); - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java deleted file mode 100644 index ee6540164c05d..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * An immutable pair of int values. - */ -public record IntIntPair(int leftInt, int rightInt) { - public static IntIntPair of(int left, int right) { - return new IntIntPair(left, right); - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java deleted file mode 100644 index be7ffd8076e44..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * Open-addressing hash set for primitive int values. - * Not thread-safe. - */ -public class IntOpenHashSet { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private int[] keys; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - public IntOpenHashSet() { - this(MIN_CAPACITY); - } - - public IntOpenHashSet(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new int[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - public boolean add(int key) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (true) { - if (!used[idx]) { - if (size >= threshold) { - rehash(capacity * 2); - return add(key); - } - keys[idx] = key; - used[idx] = true; - size++; - return true; - } - if (keys[idx] == key) { - return false; - } - idx = (idx + 1) & mask; - } - } - - public boolean contains(int key) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (true) { - if (!used[idx]) { - return false; - } - if (keys[idx] == key) { - return true; - } - idx = (idx + 1) & mask; - } - } - - public int size() { - return size; - } - - public boolean isEmpty() { - return size == 0; - } - - private void rehash(int newCapacity) { - int[] oldKeys = keys; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new int[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - add(oldKeys[i]); - } - } - } - - private static int hash(int key) { - int h = key * 0x9E3779B9; - return h ^ (h >>> 16); - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java deleted file mode 100644 index 10b2e30b9e478..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.function.LongToIntFunction; - -/** - * A map with primitive {@code long} keys and primitive {@code int} values. - * - *

The default return value for missing keys is {@code 0}. - */ -public interface Long2IntMap { - - /** - * Returns the value for the given key, or {@code 0} if not present. - * - * @param key the key - * @return the mapped value, or {@code 0} - */ - int get(long key); - - /** - * Associates the given value with the given key. - * - * @param key the key - * @param value the value - * @return the previous value, or {@code 0} if there was no mapping - */ - int put(long key, int value); - - /** - * Removes the mapping for the given key. - * - * @param key the key - * @return the previous value, or {@code 0} if there was no mapping - */ - int remove(long key); - - /** - * Returns the value for the given key, or the specified default if not present. - * - * @param key the key - * @param defaultValue the default value to return if the key is absent - * @return the mapped value, or {@code defaultValue} - */ - int getOrDefault(long key, int defaultValue); - - /** - * If the key is not already present, computes its value using the given function - * and inserts it. - * - * @param key the key - * @param mappingFunction the function to compute a value - * @return the current (existing or computed) value - */ - int computeIfAbsent(long key, LongToIntFunction mappingFunction); - - /** - * Returns {@code true} if this map contains no entries. - */ - boolean isEmpty(); - - /** - * Removes all entries from this map. - */ - void clear(); -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java deleted file mode 100644 index 22e876dc68373..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.function.LongToIntFunction; - -/** - * Open-addressing hash map with primitive long keys and primitive int values. - * Uses linear probing and fibonacci hashing. Returns 0 for missing keys. - * Not thread-safe. - */ -public class Long2IntOpenHashMap implements Long2IntMap { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private long[] keys; - private int[] values; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - public Long2IntOpenHashMap() { - this(MIN_CAPACITY); - } - - public Long2IntOpenHashMap(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new long[cap]; - values = new int[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - @Override - public int get(long key) { - int idx = indexOf(key); - return idx >= 0 ? values[idx] : 0; - } - - @Override - public int put(long key, int value) { - int idx = indexOf(key); - if (idx >= 0) { - int old = values[idx]; - values[idx] = value; - return old; - } - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - return 0; - } - - @Override - public int remove(long key) { - int idx = indexOf(key); - if (idx < 0) { - return 0; - } - int old = values[idx]; - removeAt(idx); - return old; - } - - @Override - public int getOrDefault(long key, int defaultValue) { - int idx = indexOf(key); - return idx >= 0 ? values[idx] : defaultValue; - } - - @Override - public int computeIfAbsent(long key, LongToIntFunction mappingFunction) { - int idx = indexOf(key); - if (idx >= 0) { - return values[idx]; - } - int value = mappingFunction.applyAsInt(key); - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - return value; - } - - @Override - public boolean isEmpty() { - return size == 0; - } - - @Override - public void clear() { - if (size > 0) { - java.util.Arrays.fill(used, false); - size = 0; - } - } - - private int indexOf(long key) { - int mask = capacity - 1; - int idx = Long2ObjectOpenHashMap.hash(key) & mask; - while (true) { - if (!used[idx]) { - return -1; - } - if (keys[idx] == key) { - return idx; - } - idx = (idx + 1) & mask; - } - } - - private void insertNew(long key, int value) { - int mask = capacity - 1; - int idx = Long2ObjectOpenHashMap.hash(key) & mask; - while (used[idx]) { - idx = (idx + 1) & mask; - } - keys[idx] = key; - values[idx] = value; - used[idx] = true; - size++; - } - - private void removeAt(int idx) { - int mask = capacity - 1; - size--; - int next = (idx + 1) & mask; - while (used[next]) { - int naturalSlot = Long2ObjectOpenHashMap.hash(keys[next]) & mask; - if ((next > idx && (naturalSlot <= idx || naturalSlot > next)) - || (next < idx && (naturalSlot <= idx && naturalSlot > next))) { - keys[idx] = keys[next]; - values[idx] = values[next]; - idx = next; - } - next = (next + 1) & mask; - } - used[idx] = false; - } - - private void rehash(int newCapacity) { - long[] oldKeys = keys; - int[] oldValues = values; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new long[newCapacity]; - values = new int[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - insertNew(oldKeys[i], oldValues[i]); - } - } - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java deleted file mode 100644 index 56891027d46d4..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.Collection; -import java.util.function.LongFunction; - -/** - * A map with primitive {@code long} keys and object values. - * - *

Using primitive keys avoids the overhead of boxing {@code long} values into - * {@link Long} objects, reducing both memory usage and GC pressure compared to - * {@code Map}. - * - * @param the type of mapped values - */ -public interface Long2ObjectMap { - - /** - * Returns the value associated with the given key, or {@code null} if not present. - * - * @param key the key - * @return the value, or {@code null} - */ - V get(long key); - - /** - * Associates the given value with the given key. - * - * @param key the key - * @param value the value - * @return the previous value, or {@code null} if there was no mapping - */ - V put(long key, V value); - - /** - * Removes the mapping for the given key. - * - * @param key the key - * @return the previous value, or {@code null} if there was no mapping - */ - V remove(long key); - - /** - * If the key is not already present, computes its value using the given function - * and inserts it. - * - * @param key the key - * @param mappingFunction the function to compute a value - * @return the current (existing or computed) value - */ - V computeIfAbsent(long key, LongFunction mappingFunction); - - /** - * Returns {@code true} if this map contains no entries. - */ - boolean isEmpty(); - - /** - * Returns the number of entries in this map. - */ - int size(); - - /** - * Removes all entries from this map. - */ - void clear(); - - /** - * Iterates over all entries, calling the consumer with primitive long keys - * (no boxing). - * - * @param consumer the consumer to call for each entry - */ - void forEach(LongObjConsumer consumer); - - /** - * Returns a {@link Collection} view of the values in this map. - * The collection supports iteration and {@code stream()} operations. - */ - Collection values(); -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java deleted file mode 100644 index 70bc888bc84b3..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.AbstractCollection; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.function.LongFunction; - -/** - * Open-addressing hash map with primitive long keys and object values. - * Uses linear probing and fibonacci hashing for good distribution. - * Not thread-safe. - */ -@SuppressWarnings("unchecked") -public class Long2ObjectOpenHashMap implements Long2ObjectMap { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private long[] keys; - private Object[] values; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - /** - * Creates a new map with default capacity. - */ - public Long2ObjectOpenHashMap() { - this(MIN_CAPACITY); - } - - /** - * Creates a new map sized to hold the expected number of items without rehashing. - * - * @param expectedItems the expected number of items - */ - public Long2ObjectOpenHashMap(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new long[cap]; - values = new Object[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - @Override - public V get(long key) { - int idx = indexOf(key); - if (idx >= 0) { - return (V) values[idx]; - } - return null; - } - - @Override - public V put(long key, V value) { - int idx = indexOf(key); - if (idx >= 0) { - V old = (V) values[idx]; - values[idx] = value; - return old; - } - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - return null; - } - - @Override - public V remove(long key) { - int idx = indexOf(key); - if (idx < 0) { - return null; - } - V old = (V) values[idx]; - removeAt(idx); - return old; - } - - @Override - public V computeIfAbsent(long key, LongFunction mappingFunction) { - int idx = indexOf(key); - if (idx >= 0) { - return (V) values[idx]; - } - V value = mappingFunction.apply(key); - if (value != null) { - if (size >= threshold) { - rehash(capacity * 2); - } - insertNew(key, value); - } - return value; - } - - @Override - public boolean isEmpty() { - return size == 0; - } - - @Override - public int size() { - return size; - } - - @Override - public void clear() { - if (size > 0) { - java.util.Arrays.fill(used, false); - java.util.Arrays.fill(values, null); - size = 0; - } - } - - @Override - public void forEach(LongObjConsumer consumer) { - for (int i = 0; i < capacity; i++) { - if (used[i]) { - consumer.accept(keys[i], (V) values[i]); - } - } - } - - @Override - public Collection values() { - return new ValuesCollection(); - } - - /** - * Find the index of the given key, or -1 if not present. - */ - private int indexOf(long key) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (true) { - if (!used[idx]) { - return -1; - } - if (keys[idx] == key) { - return idx; - } - idx = (idx + 1) & mask; - } - } - - private void insertNew(long key, V value) { - int mask = capacity - 1; - int idx = hash(key) & mask; - while (used[idx]) { - idx = (idx + 1) & mask; - } - keys[idx] = key; - values[idx] = value; - used[idx] = true; - size++; - } - - /** - * Remove the entry at the given index using backward-shift deletion - * (no tombstones needed). - */ - private void removeAt(int idx) { - int mask = capacity - 1; - values[idx] = null; - size--; - // Shift back entries that may have been displaced by the removed entry - int next = (idx + 1) & mask; - while (used[next]) { - int naturalSlot = hash(keys[next]) & mask; - // Check if 'next' is displaced past 'idx' (wrapping around) - if ((next > idx && (naturalSlot <= idx || naturalSlot > next)) - || (next < idx && (naturalSlot <= idx && naturalSlot > next))) { - keys[idx] = keys[next]; - values[idx] = values[next]; - values[next] = null; - idx = next; - } - next = (next + 1) & mask; - } - used[idx] = false; - } - - private void rehash(int newCapacity) { - long[] oldKeys = keys; - Object[] oldValues = values; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new long[newCapacity]; - values = new Object[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - insertNew(oldKeys[i], (V) oldValues[i]); - } - } - } - - /** - * Fibonacci hashing for long keys. - */ - static int hash(long key) { - long h = key * 0x9E3779B97F4A7C15L; - return (int) (h ^ (h >>> 32)); - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } - - private class ValuesCollection extends AbstractCollection { - @Override - public Iterator iterator() { - return new Iterator<>() { - private int idx = findNext(0); - - @Override - public boolean hasNext() { - return idx < capacity; - } - - @Override - public V next() { - if (idx >= capacity) { - throw new NoSuchElementException(); - } - V val = (V) values[idx]; - idx = findNext(idx + 1); - return val; - } - - private int findNext(int from) { - while (from < capacity && !used[from]) { - from++; - } - return from; - } - }; - } - - @Override - public int size() { - return Long2ObjectOpenHashMap.this.size; - } - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java deleted file mode 100644 index a3dd6108e51e1..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * A consumer that accepts a primitive long key and an object value. - */ -@FunctionalInterface -public interface LongObjConsumer { - void accept(long key, V value); -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java deleted file mode 100644 index 48f3720dff7a7..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.function.LongConsumer; - -/** - * Open-addressing hash set for primitive long values. - * Not thread-safe. - */ -public class LongOpenHashSet implements Iterable { - - private static final float LOAD_FACTOR = 0.75f; - private static final int MIN_CAPACITY = 16; - - private long[] keys; - private boolean[] used; - private int size; - private int capacity; - private int threshold; - - public LongOpenHashSet() { - this(MIN_CAPACITY); - } - - public LongOpenHashSet(int expectedItems) { - int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems / LOAD_FACTOR) + 1)); - keys = new long[cap]; - used = new boolean[cap]; - capacity = cap; - threshold = (int) (cap * LOAD_FACTOR); - } - - public boolean add(long key) { - int mask = capacity - 1; - int idx = Long2ObjectOpenHashMap.hash(key) & mask; - while (true) { - if (!used[idx]) { - if (size >= threshold) { - rehash(capacity * 2); - return add(key); - } - keys[idx] = key; - used[idx] = true; - size++; - return true; - } - if (keys[idx] == key) { - return false; - } - idx = (idx + 1) & mask; - } - } - - public boolean contains(long key) { - int mask = capacity - 1; - int idx = Long2ObjectOpenHashMap.hash(key) & mask; - while (true) { - if (!used[idx]) { - return false; - } - if (keys[idx] == key) { - return true; - } - idx = (idx + 1) & mask; - } - } - - public int size() { - return size; - } - - public boolean isEmpty() { - return size == 0; - } - - public void forEach(LongConsumer consumer) { - for (int i = 0; i < capacity; i++) { - if (used[i]) { - consumer.accept(keys[i]); - } - } - } - - @Override - public Iterator iterator() { - return new Iterator<>() { - private int idx = findNext(0); - - @Override - public boolean hasNext() { - return idx < capacity; - } - - @Override - public Long next() { - if (idx >= capacity) { - throw new NoSuchElementException(); - } - long val = keys[idx]; - idx = findNext(idx + 1); - return val; - } - - private int findNext(int from) { - while (from < capacity && !used[from]) { - from++; - } - return from; - } - }; - } - - private void rehash(int newCapacity) { - long[] oldKeys = keys; - boolean[] oldUsed = used; - int oldCapacity = capacity; - - capacity = newCapacity; - keys = new long[newCapacity]; - used = new boolean[newCapacity]; - threshold = (int) (newCapacity * LOAD_FACTOR); - size = 0; - - for (int i = 0; i < oldCapacity; i++) { - if (oldUsed[i]) { - add(oldKeys[i]); - } - } - } - - private static int tableSizeFor(int cap) { - int n = cap - 1; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java deleted file mode 100644 index 8134afb091e25..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -/** - * An immutable pair of an object and an int value. - */ -public record ObjectIntPair(T left, int rightInt) { - public static ObjectIntPair of(T left, int right) { - return new ObjectIntPair<>(left, right); - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java deleted file mode 100644 index 2558e6af38d58..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.testng.annotations.Test; - -public class HashSetTest { - - @Test - public void testLongOpenHashSetBasic() { - LongOpenHashSet set = new LongOpenHashSet(); - assertTrue(set.isEmpty()); - assertTrue(set.add(1)); - assertTrue(set.add(2)); - assertFalse(set.add(1)); // duplicate - assertEquals(set.size(), 2); - assertTrue(set.contains(1)); - assertTrue(set.contains(2)); - assertFalse(set.contains(3)); - } - - @Test - public void testLongOpenHashSetIterable() { - LongOpenHashSet set = new LongOpenHashSet(); - set.add(3); - set.add(1); - set.add(2); - List values = new ArrayList<>(); - for (long v : set) { - values.add(v); - } - Collections.sort(values); - assertEquals(values, List.of(1L, 2L, 3L)); - } - - @Test - public void testLongOpenHashSetForEach() { - LongOpenHashSet set = new LongOpenHashSet(); - set.add(10); - set.add(20); - List values = new ArrayList<>(); - set.forEach((long v) -> values.add(v)); - Collections.sort(values); - assertEquals(values, List.of(10L, 20L)); - } - - @Test - public void testLongOpenHashSetRehash() { - LongOpenHashSet set = new LongOpenHashSet(4); - for (int i = 0; i < 100; i++) { - set.add(i); - } - assertEquals(set.size(), 100); - for (int i = 0; i < 100; i++) { - assertTrue(set.contains(i)); - } - } - - @Test - public void testIntOpenHashSetBasic() { - IntOpenHashSet set = new IntOpenHashSet(); - assertTrue(set.isEmpty()); - assertTrue(set.add(1)); - assertTrue(set.add(2)); - assertFalse(set.add(1)); // duplicate - assertEquals(set.size(), 2); - assertTrue(set.contains(1)); - assertTrue(set.contains(2)); - assertFalse(set.contains(3)); - } - - @Test - public void testIntOpenHashSetRehash() { - IntOpenHashSet set = new IntOpenHashSet(4); - for (int i = 0; i < 100; i++) { - set.add(i); - } - assertEquals(set.size(), 100); - for (int i = 0; i < 100; i++) { - assertTrue(set.contains(i)); - } - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java deleted file mode 100644 index 75c9e4d3d4b94..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import org.testng.annotations.Test; - -public class Int2ObjectOpenHashMapTest { - - @Test - public void testEmpty() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - assertTrue(map.isEmpty()); - assertNull(map.get(1)); - } - - @Test - public void testPutGet() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - assertNull(map.put(1, "one")); - assertNull(map.put(2, "two")); - assertEquals(map.get(1), "one"); - assertEquals(map.get(2), "two"); - assertNull(map.get(3)); - assertEquals(map.size(), 2); - } - - @Test - public void testRemove() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - assertEquals(map.remove(1), "one"); - assertNull(map.get(1)); - assertEquals(map.size(), 1); - } - - @Test - public void testRemoveConditional() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - String val = "one"; - map.put(1, val); - assertFalse(map.remove(1, "other")); // different ref - assertTrue(map.remove(1, val)); - assertTrue(map.isEmpty()); - } - - @Test - public void testClear() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.clear(); - assertTrue(map.isEmpty()); - assertNull(map.get(1)); - } - - @Test - public void testRehash() { - Int2ObjectOpenHashMap map = new Int2ObjectOpenHashMap<>(4); - for (int i = 0; i < 100; i++) { - map.put(i, i); - } - assertEquals(map.size(), 100); - for (int i = 0; i < 100; i++) { - assertEquals(map.get(i), Integer.valueOf(i)); - } - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java deleted file mode 100644 index 7ced026525274..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import org.testng.annotations.Test; - -public class Long2IntOpenHashMapTest { - - @Test - public void testEmpty() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - assertTrue(map.isEmpty()); - assertEquals(map.get(0), 0); - assertEquals(map.get(1), 0); - } - - @Test - public void testPutGet() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - assertEquals(map.put(1, 10), 0); - assertEquals(map.put(2, 20), 0); - assertFalse(map.isEmpty()); - assertEquals(map.get(1), 10); - assertEquals(map.get(2), 20); - assertEquals(map.get(3), 0); // default - } - - @Test - public void testPutReplace() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - map.put(1, 10); - assertEquals(map.put(1, 100), 10); - assertEquals(map.get(1), 100); - } - - @Test - public void testRemove() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - map.put(1, 10); - map.put(2, 20); - assertEquals(map.remove(1), 10); - assertEquals(map.get(1), 0); // default after removal - assertEquals(map.remove(99), 0); // not present - } - - @Test - public void testGetOrDefault() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - map.put(1, 10); - assertEquals(map.getOrDefault(1, -1), 10); - assertEquals(map.getOrDefault(2, -1), -1); - } - - @Test - public void testComputeIfAbsent() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - assertEquals(map.computeIfAbsent(1, k -> 10), 10); - assertEquals(map.computeIfAbsent(1, k -> 99), 10); - } - - @Test - public void testClear() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(); - map.put(1, 10); - map.put(2, 20); - map.clear(); - assertTrue(map.isEmpty()); - assertEquals(map.get(1), 0); - } - - @Test - public void testRehash() { - Long2IntOpenHashMap map = new Long2IntOpenHashMap(4); - for (int i = 0; i < 100; i++) { - map.put(i, i * 10); - } - for (int i = 0; i < 100; i++) { - assertEquals(map.get(i), i * 10); - } - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java deleted file mode 100644 index 6e33a5b531505..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.testng.annotations.Test; - -public class Long2ObjectOpenHashMapTest { - - @Test - public void testEmpty() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - assertTrue(map.isEmpty()); - assertEquals(map.size(), 0); - assertNull(map.get(0)); - assertNull(map.get(1)); - assertNull(map.remove(1)); - } - - @Test - public void testPutGet() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - assertNull(map.put(1, "one")); - assertNull(map.put(2, "two")); - assertNull(map.put(3, "three")); - assertEquals(map.size(), 3); - assertFalse(map.isEmpty()); - assertEquals(map.get(1), "one"); - assertEquals(map.get(2), "two"); - assertEquals(map.get(3), "three"); - assertNull(map.get(4)); - } - - @Test - public void testPutReplace() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - assertNull(map.put(1, "one")); - assertEquals(map.put(1, "ONE"), "one"); - assertEquals(map.size(), 1); - assertEquals(map.get(1), "ONE"); - } - - @Test - public void testRemove() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.put(3, "three"); - assertEquals(map.remove(2), "two"); - assertEquals(map.size(), 2); - assertNull(map.get(2)); - assertEquals(map.get(1), "one"); - assertEquals(map.get(3), "three"); - } - - @Test - public void testComputeIfAbsent() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - assertEquals(map.computeIfAbsent(1, k -> "one"), "one"); - assertEquals(map.size(), 1); - assertEquals(map.computeIfAbsent(1, k -> "other"), "one"); - assertEquals(map.size(), 1); - } - - @Test - public void testClear() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.clear(); - assertTrue(map.isEmpty()); - assertEquals(map.size(), 0); - assertNull(map.get(1)); - } - - @Test - public void testForEach() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.put(3, "three"); - Map collected = new HashMap<>(); - map.forEach(collected::put); - assertEquals(collected.size(), 3); - assertEquals(collected.get(1L), "one"); - assertEquals(collected.get(2L), "two"); - assertEquals(collected.get(3L), "three"); - } - - @Test - public void testValues() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(1, "one"); - map.put(2, "two"); - map.put(3, "three"); - List values = new ArrayList<>(map.values()); - assertEquals(values.size(), 3); - assertTrue(values.contains("one")); - assertTrue(values.contains("two")); - assertTrue(values.contains("three")); - } - - @Test - public void testRehash() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(4); - for (int i = 0; i < 100; i++) { - map.put(i, i); - } - assertEquals(map.size(), 100); - for (int i = 0; i < 100; i++) { - assertEquals(map.get(i), Integer.valueOf(i)); - } - } - - @Test - public void testZeroKey() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(0, "zero"); - assertEquals(map.get(0), "zero"); - assertEquals(map.size(), 1); - assertEquals(map.remove(0), "zero"); - assertTrue(map.isEmpty()); - } - - @Test - public void testNegativeKeys() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - map.put(-1, "neg1"); - map.put(Long.MIN_VALUE, "min"); - assertEquals(map.get(-1), "neg1"); - assertEquals(map.get(Long.MIN_VALUE), "min"); - } - - @Test - public void testRemoveAndReinsert() { - Long2ObjectOpenHashMap map = new Long2ObjectOpenHashMap<>(); - for (int i = 0; i < 50; i++) { - map.put(i, "v" + i); - } - for (int i = 0; i < 25; i++) { - map.remove(i); - } - assertEquals(map.size(), 25); - for (int i = 25; i < 50; i++) { - assertEquals(map.get(i), "v" + i); - } - // Reinsert - for (int i = 0; i < 25; i++) { - map.put(i, "new" + i); - } - assertEquals(map.size(), 50); - for (int i = 0; i < 25; i++) { - assertEquals(map.get(i), "new" + i); - } - } -} diff --git a/settings.gradle.kts b/settings.gradle.kts index c847d5be210cb..4b8c0b2850a24 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -191,6 +191,10 @@ include("pulsar-broker-auth-oidc") include("pulsar-broker-auth-sasl") include("pulsar-client-auth-sasl") +// Tier 9 — shaded utility modules (in core-modules) +include("pulsar-client-fastutil-minimized") +include("pulsar-broker-fastutil-minimized") + // Tier 10 — shaded client modules (in core-modules) include("pulsar-client-shaded") include("pulsar-client-all")