diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java index e2df71520fe76..f4e9348e9b957 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java @@ -26,6 +26,8 @@ import java.util.Comparator; import java.util.PriorityQueue; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.testng.Reporter; import org.testng.annotations.Test; public class TripleLongPriorityQueueTest { @@ -168,6 +170,51 @@ public void testCompareWithSamePrefix() { pq.close(); } + @Test + public void testRandomizedInterleavedOperations() { + Comparator tupleComparator = Comparator + .comparingLong((long[] tuple) -> tuple[0]) + .thenComparingLong(tuple -> tuple[1]) + .thenComparingLong(tuple -> tuple[2]); + PriorityQueue expected = new PriorityQueue<>(tupleComparator); + long seed = randomSeed("testRandomizedInterleavedOperations"); + Random random = new Random(seed); + + try (TripleLongPriorityQueue actual = new TripleLongPriorityQueue(4)) { + for (int i = 0; i < 20_000; i++) { + String context = "seed=" + seed + " iteration=" + i; + boolean add = expected.isEmpty() || random.nextInt(100) < 65; + if (add) { + long[] tuple = { + randomLongWithEdgeCases(random, 512), + randomLongWithEdgeCases(random, 32), + randomLongWithEdgeCases(random, 512) + }; + actual.add(tuple[0], tuple[1], tuple[2]); + expected.add(tuple); + } else { + assertQueueHead(expected, actual, context); + expected.poll(); + actual.pop(); + } + + assertEquals(actual.size(), expected.size(), context); + if (!expected.isEmpty() && (i & 7) == 0) { + assertQueueHead(expected, actual, context); + } + } + + int drainIteration = 0; + while (!expected.isEmpty()) { + assertQueueHead(expected, actual, "seed=" + seed + " drainIteration=" + drainIteration++); + expected.poll(); + actual.pop(); + } + + assertTrue(actual.isEmpty()); + } + } + @Test public void testShrink() throws Exception { int initialCapacity = 20; @@ -244,4 +291,31 @@ public void testDifferentialRandomPriorityQueue() { } } } + + private static long randomSeed(String testName) { + String configuredSeed = System.getProperty("pulsar.collections.randomSeed"); + long seed = configuredSeed != null ? Long.parseLong(configuredSeed) : ThreadLocalRandom.current().nextLong(); + String message = TripleLongPriorityQueueTest.class.getSimpleName() + "." + testName + " seed=" + seed; + Reporter.log(message, true); + return seed; + } + + private static long randomLongWithEdgeCases(Random random, int bound) { + switch (random.nextInt(64)) { + case 0: + return Long.MIN_VALUE; + case 1: + return Long.MAX_VALUE; + default: + return random.nextInt(bound) - bound / 2L; + } + } + + private static void assertQueueHead(PriorityQueue expected, TripleLongPriorityQueue actual, + String context) { + long[] tuple = expected.peek(); + assertEquals(actual.peekN1(), tuple[0], context); + assertEquals(actual.peekN2(), tuple[1], context); + assertEquals(actual.peekN3(), tuple[2], context); + } }