From 20b321a9e309a151b287b67b6494c8c96f9612b4 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Tue, 6 Feb 2024 11:41:41 -0800 Subject: [PATCH 01/13] Refactort CommandsForRange to work based off disk state rather than in-memory state --- .../accord/utils/CheckpointIntervalArray.java | 222 ++++ .../utils/CheckpointIntervalArrayBuilder.java | 1134 +++++++++++++++++ .../accord/utils/SearchableRangeList.java | 197 +-- .../utils/SearchableRangeListBuilder.java | 1067 +--------------- .../java/accord/utils/async/AsyncChains.java | 12 + .../main/java/accord/utils/random/Picker.java | 27 +- .../src/test/java/accord/utils/Gen.java | 30 + .../src/test/java/accord/utils/Gens.java | 266 ++++ .../src/test/java/accord/utils/Property.java | 2 +- .../accord/utils/SearchableRangeListTest.java | 116 ++ .../groovy/accord.java-conventions.gradle | 2 +- 11 files changed, 1844 insertions(+), 1231 deletions(-) create mode 100644 accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java create mode 100644 accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java create mode 100644 accord-core/src/test/java/accord/utils/SearchableRangeListTest.java diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java new file mode 100644 index 0000000000..84063da17e --- /dev/null +++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils; + +import java.util.Arrays; + +import accord.utils.CheckpointIntervalArrayBuilder.Accessor; +import net.nicoulaj.compilecommand.annotations.Inline; + +import static accord.utils.SortedArrays.Search.CEIL; + +public class CheckpointIntervalArray +{ + // scan distance can be kept very small as we guarantee to use at most linear extra space even with a scan distance of zero + static final int MAX_SCAN_DISTANCE = 255; + protected static final int BIT30 = 0x40000000; + protected static final int BIT29 = 0x20000000; + + final Ranges ranges; + + /** + * The lower bound for each checkpoint. + * The checkpoint {@code i} applies to all ranges (incl) starting from {@code lowerBounds[i]}, + * but before (excl) {@code lowerBounds[i+1]}. + */ + final int[] lowerBounds; + + /** + * Logically one entry per checkpoint, mapping {@link #lowerBounds} to {@link #checkpointLists}, + * however we also encode an additional byte per entry representing the scan distance for the + * ranges handled by this checkpoint. These are grouped into an integer per four mappings, i.e. + * we encode batches of five ints, with the first int containing the four scan distances for the + * next four checkpoints, and the following four ints containing the respective offsets into + * {@link #checkpointLists}. + *

+ * [0.........32b.........64b.........96b........128b........160b........192b] + * [ d1 d2 d3 d4 mapping1 mapping2 mapping3 mapping4 d5 d6 d7 d8 ] + */ + final int[] headers; + + /** + * A list of indexes in {@link #ranges} contained by each checkpoint; checkpoints are + * mapped from {@link #lowerBounds} by {@link #headers}. + *

+ * Entries are sorted in descending order by the end of the range they cover, so that + * a search of this collection my terminate as soon as it encounters a range that does + * not cover the item we are searching for. + *

+ * This collection may contain negative values, in which case these point to other + * checkpoints, whose direct contents (i.e. the positive values of) we may + * search. + *

+ */ + final int[] checkpointLists; + + public final int maxScanAndCheckpointMatches; + private final Accessor accessor; + + public CheckpointIntervalArray(Accessor accessor, Ranges ranges, + int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches) + { + this.accessor = accessor; + this.ranges = ranges; + this.lowerBounds = lowerBounds; + this.headers = headers; + this.checkpointLists = checkpointLists; + this.maxScanAndCheckpointMatches = maxScanAndCheckpointMatches; + } + + @Inline + public int forEach(Range range, IndexedQuadConsumer forEachScanOrCheckpoint, IndexedRangeQuadConsumer forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex) + { + return forEach(accessor.start(range), accessor.end(range), forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex); + } + + public int forEach(Key startKey, Key endKey, IndexedQuadConsumer forEachScanOrCheckpoint, IndexedRangeQuadConsumer forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex) + { + if (accessor.size(ranges) == 0 || minIndex == accessor.size(ranges)) + return minIndex; + + var c = accessor.keyComparator(); + int end = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), endKey, (a, b) -> c.compare(a, accessor.start(b)), CEIL); + if (end < 0) end = -1 - end; + if (end <= minIndex) return minIndex; + + int floor = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), startKey, (a, b) -> c.compare(a, accessor.start(b)), CEIL); + int start = floor; + if (floor < 0) + { + // if there's no precise match on start, step backwards; + // if this range does not overlap us, step forwards again for start + // but retain the floor index for performing scan and checkpoint searches from + // as this contains all ranges that might overlap us (whereas those that end + // after us but before the next range's start would be missed by the next range index) + start = floor = -2 - floor; + if (start < 0) + start = floor = 0; + else if (c.compare(accessor.end(ranges, start), startKey) <= 0) + ++start; + } + + // Since endInclusive() != startInclusive(), so no need to adjust start/end comparisons + return forEach(start, end, floor, startKey, 0, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex); + } + + @Inline + protected int forEach(int start, int end, int floor, Key startBound, int cmpStartBoundWithEnd, + IndexedQuadConsumer forEachScanOrCheckpoint, IndexedRangeQuadConsumer forEachRange, + P1 p1, P2 p2, P3 p3, P4 p4, int minIndex) + { + if (start < minIndex) start = minIndex; + + // find the checkpoint array, so we know how far to step back + int checkpoint = Arrays.binarySearch(lowerBounds, floor); + if (checkpoint < 0) checkpoint = -2 - checkpoint; + if (checkpoint < 0) return end; + + int headerBaseIndex = (checkpoint / 4) * 5; + int headerSubIndex = checkpoint & 3; + int headerListIndex = headerBaseIndex + 1 + headerSubIndex; + + int scanDistance = (headers[headerBaseIndex] >>> (8 * headerSubIndex)) & 0xff; + int checkpointStart = headers[headerListIndex]; + int checkpointEnd = headers[headerListIndex + (headerSubIndex + 5)/4]; // skip the next header + + if (scanDistance == MAX_SCAN_DISTANCE) + { + scanDistance = -checkpointLists[checkpointStart++]; + Invariants.checkState(scanDistance >= MAX_SCAN_DISTANCE); + } + + // NOTE: we visit in approximately ascending order, and this is a requirement for correctness of RangeDeps builders + // Only the checkpoint is visited in uncertain order, but it is visited entirely, before the scan matches + // and the range matches + int minScanIndex = Math.max(floor - scanDistance, minIndex); + var c = accessor.keyComparator(); + for (int i = checkpointStart; i < checkpointEnd ; ++i) + { + int ri = checkpointLists[i]; + if (ri < 0) + { + int subStart, subEnd; + if ((ri & BIT30) != 0) + { + subStart = ri & 0xfffff; + subEnd = subStart + ((ri >>> 20) & 0x1ff); + } + else if ((ri & BIT29) != 0) + { + subStart = ri & 0x1fffffff; + subEnd = Integer.MAX_VALUE; + } + else + { + int length = ri & 0x1fffffff; + subStart = checkpointLists[++i]; + subEnd = subStart + length; + } + + for (int j = subStart ; j < subEnd ; ++j) + { + ri = checkpointLists[j]; + if (ri < 0) + continue; + + if (c.compare(accessor.end(ranges, ri), startBound) <= cmpStartBoundWithEnd) + break; + + if (ri >= minIndex && ri < minScanIndex) + forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri); + } + } + else + { + // if startBound is key, we cannot be equal to it; + // if startBound is a Range start, we also cannot be equal to it due to the requirement that + // endInclusive() != startInclusive(), so equality really means inequality + if (c.compare(accessor.end(ranges, ri), startBound) <= cmpStartBoundWithEnd) + break; + + if (ri >= minIndex && ri < minScanIndex) + forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri); + } + } + + for (int i = minScanIndex; i < floor ; ++i) + { + if (c.compare(accessor.end(ranges, i), startBound) > cmpStartBoundWithEnd) + forEachScanOrCheckpoint.accept(p1, p2, p3, p4, i); + } + + if (start == end) + return end; + + forEachRange.accept(p1, p2, p3, p4, start, end); + return end; + } +} diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java new file mode 100644 index 0000000000..b82c6fda3a --- /dev/null +++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java @@ -0,0 +1,1134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Objects; +import java.util.TreeSet; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; + +import static accord.utils.ArrayBuffers.cachedInts; +import static accord.utils.CheckpointIntervalArray.MAX_SCAN_DISTANCE; +import static accord.utils.SortedArrays.Search.CEIL; + +public class CheckpointIntervalArrayBuilder +{ + public enum Strategy + { + /** + * Do not tenure any ranges that are scannable from the currently in-effect max scan distance. + * This means we probably do less work on construction, but that our measure of the match count + * at any point is inaccurate, and so our heuristics for when to write checkpoints may be wrong, + * leading to more checkpoints than necessary. + */ + FAST, + + /** + * Tenure every range covering more than goalScanDistance. Any within max scan distance will also + * update the scan distance, so that they will be filtered when a checkpoint is written. But + * in the meantime they permit accurate tracking of the number of matches a query can return, + * permitting our complexity calculations (that determine when checkpoints should be written, and + * what our maximum scan distance should be), to be accurate. This can avoid bouncing between + * two extremes, where a low max scan distance tenures and correctly detects a desirable larger scan + * distance, which we rollover and this prevents us tenuring and tracking the number of matches, so + * that we then pick a low max scan distance (and thereby also write a new checkpoint) + */ + ACCURATE + } + + /** + * Should we maintain pointers to prior checkpoints that we may reference instead of reserializing + * the remaining contents. This is cheap to visit as we stop enumerating as soon as we encounter + * an entry that no longer covers us. We use some simple heuristics when deciding whether to do + * this, namely that there are at least two entries (so we save one checkpoint entry) and that + * there is at least one direct entry for each indirect/link entry in the range we will link. + */ + public enum Links + { + LINKS, + NO_LINKS + } + + public interface Accessor + { + boolean endInclusive(Ranges ranges); + int size(Ranges ranges); + Range get(Ranges ranges, int index); + RoutingKey start(Ranges ranges, int index); + RoutingKey start(Range range); + RoutingKey end(Ranges ranges, int index); + RoutingKey end(Range range); + Comparator keyComparator(); + int binarySearch(Ranges ranges, int from, int to, RoutingKey find, AsymmetricComparator comparator, SortedArrays.Search op); + } + + private static final int BIT31 = 0x80000000; + private static final int BIT30 = 0x40000000; + private static final int BIT29 = 0x20000000; + static final int MIN_INDIRECT_LINK_LENGTH = 2; + + final Accessor accessor; + final boolean isAccurate; + final boolean withLinks; + final Ranges ranges; + + int[] bounds; + int[] headers; + int[] lists; + int checkpointCount, headerPointer, listCount; + + final Scan scan; + final TenuredSet tenured; + final PendingCheckpoint pending = new PendingCheckpoint<>(); + + // track the maximum possible number of entries we can match with both a scan + checkpoint lookup + // this is an over-estimate and may be used by consumers to allocate out-of-order buffers for visitations + int maxScanAndCheckpointMatches; + + public CheckpointIntervalArrayBuilder(Accessor accessor, + Ranges ranges, + Strategy strategy, Links links) + { + this(accessor, ranges, Math.min(MAX_SCAN_DISTANCE, 34 - Integer.numberOfLeadingZeros(accessor.size(ranges))), strategy, links); + } + + public CheckpointIntervalArrayBuilder(Accessor accessor, + Ranges ranges, + int goalScanDistance, + Strategy strategy, Links links) + { + this.accessor = accessor; + this.isAccurate = strategy == Strategy.ACCURATE; + this.withLinks = links == Links.LINKS; + Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE); + Invariants.checkArgument(goalScanDistance > 0); + this.ranges = ranges; + this.scan = new Scan<>(accessor); + this.tenured = new TenuredSet<>(accessor); + init(ranges, goalScanDistance); + } + + void init(Ranges ranges, int goalScanDistance) + { + // we write checkpoints at least goalScanDistance apart + scan.init(goalScanDistance); + ArrayBuffers.IntBuffers cachedInts = cachedInts(); + // ask for int buffers in descending order of size + int size = accessor.size(ranges); + this.lists = cachedInts.getInts(size); // this one might need to grow + // +2 to round-up each division, and +2 to account for the final entry (which might require an empty scan distance header) + this.headers = cachedInts.getInts(((size / goalScanDistance) * 5) / 4 + 4); + this.bounds = cachedInts.getInts(size / goalScanDistance + 1); + } + + public interface Factory + { + T build(Ranges ranges, int[] bounds, int[] headers, int[] lists, int maxScanAndCheckpointMatches); + } + + /** + * Walk over each range, looking ahead by {@link #maxScanDistance} to decide if a range should + * be tenured (written to a checkpoint) or scanned; the maximum scan distance is determined by the + * number of open tenured entries, i.e. the minimum number of results we can expect to be returned + * (or, if greater, the logarithm of the number of ranges in the collection). + *

+ * Once we encounter a range that should be tenured, either write a checkpoint immediately + * or make a note of the position we must scan to from the last entry in this checkpoint + * and wait until it is permitted to write a checkpoint. This range will be tenured either + * way for the following checkpoint. + *

+ * The only reason not to write a checkpoint immediately is in the case we would breach + * our linear space complexity limit, which is imposed by ensuring we have a space between + * checkpoints at least as large as the number of entries written to the last checkpoint, + * discounted by the number of entries we have removed from the tenured collection since + * the last checkpoint. + */ + public CheckpointIntervalArray build() + { + return build((ranges, bounds, headers, lists, maxScanAndCheckpointMatches) -> new CheckpointIntervalArray<>(accessor, ranges, bounds, headers, lists, maxScanAndCheckpointMatches)); + } + + public T build(Factory factory) + { + int size = accessor.size(ranges); + for (int ri = 0 ; ri < size ; ++ri) + { + // write a checkpoint if we meet our linear space complexity requirements + // and we either have a tenured range that we must scan, + // or the scan distance is now much larger than the minimum number of search results + if (shouldWriteCheckpoint(ri)) + writeCheckpoint(ri); + + // either tenure or update scan distance, potentially writing a checkpoint + tenureOrScan(ri); + tenured.untenure(ri); + } + + // write our final pending checkpoint + writeCheckpoint(size); + closeHeaders(); + + ArrayBuffers.IntBuffers cachedInts = cachedInts(); + int[] lists = cachedInts.completeAndDiscard(this.lists, listCount); + int[] headers = cachedInts.completeAndDiscard(this.headers, headerPointer); + int[] bounds = cachedInts.completeAndDiscard(this.bounds, checkpointCount); + return factory.build(ranges, bounds, headers, lists, maxScanAndCheckpointMatches); + } + + /** + * Categorise the candidateIdx as either scannable, and if so update the scan distance; + * or unscannable, in which case add it to the {@link #tenured} collection. + * Note, that in ACCURATE mode we tenure the item if it is outside of the goalScanDistance + * so we may track O(k) accurately above the O(lg2(N)) search and default scan distance, + * but we still update the scan distance so that the checkpoint will exclude this entry. + */ + private void tenureOrScan(int index) + { + Invariants.checkArgument(index >= 0); + + // then either migrate the index to pendingTenured, or ensure it will be scanned + RoutingKey end = accessor.end(ranges, index); + int scanLimit = scanLimit(index, isAccurate ? scan.goal : maxScanDistance()); + if (shouldTenure(end, scanLimit)) + { + int lastIndex = tenured.tenure(end, index, ranges, scanLimit + 1); + if (lastIndex - index > maxScanDistance()) scan.tenured(index); + else if (!isAccurate) throw new IllegalStateException(); + else scan.updateScanDistance(index, lastIndex - index, this); + } + else + { + // TODO (low priority, efficiency): if the prior checkpoint has a scan distance >= this one, + // and <= 50% more than this one and there's no scanMustReachIndex nor tenuredRanges, don't + // write a new checkpoint (perhaps split shouldWriteCheckpoint logic in two) + scan.update(end, index, ranges, scanLimit, this); + } + } + + /** + * We are forbidden from writing a checkpoint nearer than this to a prior checkpoint. + * This imposes our linear space complexity bounds, while not harming our O(log2(N) + K) + * complexity bounds, as we guarantee minimumSpan is never more than the number of query + * results. + */ + private int minimumSpan() + { + return Math.max(scan.goal(), tenured.minimumSpan()); + } + + private int maxScanDistance() + { + // minimumSpan() reduces overtime, but there is no reason to reduce our scan distance + // for tenuring below the scan distance we will write + return Math.max(scan.watermark(), minimumSpan()); + } + + /** + * The index after the last index we can scan from {@code atIndex} with at most {@code maxScanDistance}. + */ + private int scanLimit(int atIndex, int maxScanDistance) + { + return Math.min(1 + atIndex + maxScanDistance, accessor.size(ranges)); + } + + private boolean shouldTenure(RoutingKey end, int scanLimit) + { + return scanLimit < accessor.size(ranges) && accessor.keyComparator().compare(end, accessor.start(ranges, scanLimit)) > 0; + } + + private boolean canWriteCheckpoint(int atIndex) + { + return atIndex - pending.atIndex >= minimumSpan(); + } + + private boolean shouldWriteCheckpoint(int atIndex) + { + if (!canWriteCheckpoint(atIndex)) + return false; + + // TODO (desired, efficiency): consider these triggers + if (scan.mustCheckpointToScanTenured(atIndex, maxScanDistance())) + return true; + + return scan.hasMaybeDivergedFromMatchSize(tenured); + } + + /** + * Write a checkpoint for ranges[prevCheckpointIndex...ri) + * + * 1) Finalise the scan distance + * 2) Write the header + * 3) Filter the pending tenured ranges to remove those we can scan + * 4) Write this list out + * 5) Setup a link to this list, if it is large enough + * 6) Rollover the scan, tenured and pending structures for the new pending checkpoint + */ + private void writeCheckpoint(int nextCheckpointIndex) + { + int lastIndex = nextCheckpointIndex - 1; + int scanDistance = scan.finalise(lastIndex); + scanDistance = extendScanDistance(lastIndex, scanDistance); + + if (pending.atIndex < 0) + { + // we don't have any checkpoints pending, so don't try to finalise it + // but if the new checkpoint doesn't cover index 0, insert a new empty + // checkpoint for the scan distance + if (nextCheckpointIndex > 0) + { + // setup an initial empty checkpoint to store the first scan distance + maxScanAndCheckpointMatches = scanDistance; + writeHeader(scanDistance, 0); + } + } + else + { + writeHeader(scanDistance, pending.atIndex); + int maxCheckpointMatchCount = pending.filter(scanDistance, lastIndex); + int listIndex = writeList(pending); + if (withLinks) + pending.setupLinkChain(tenured, listIndex, listCount); + maxScanAndCheckpointMatches = Math.max(maxScanAndCheckpointMatches, scanDistance + maxCheckpointMatchCount); + } + + savePendingCheckpointAndResetScanDistance(nextCheckpointIndex); + } + + private void savePendingCheckpointAndResetScanDistance(int checkpointIndex) + { + // use the tail of checkpointListBuf to buffer ranges we plan to tenure + ensureCapacity(tenured.count() + scan.watermark()); + + scan.reset(); + + if (isAccurate) + { + // TODO (low priority, efficiency): we can shift back the existing scanDistance if it's far enough from + // the next checkpoint. this might permit us to skip some comparisons + scan.resetPeakMax(tenured); + for (Tenured tenured : this.tenured) + { + int distanceToEnd = (tenured.lastIndex - checkpointIndex); + if (distanceToEnd >= scan.peakMax) + break; + + int scanDistance = tenured.lastIndex - tenured.index; + if (scanDistance <= scan.peakMax) + scan.updateScanDistance(tenured.index, scanDistance, null); + } + + if (scan.watermark() < scan.goal) + { + int ri = Scan.minScanIndex(checkpointIndex, scan.goal); + while (ri < checkpointIndex) + { + RoutingKey end = accessor.end(ranges, ri); + int scanLimit = scanLimit(ri, scan.peakMax); + if (!shouldTenure(end, scanLimit)) + scan.update(end, ri, ranges, scanLimit, null); + ++ri; + } + } + } + else + { + // the maximum scan distance that could ever have been adopted for last chunk + int oldPeakMax = scan.peakMax(); + // the minimum scan distance we will start with for processing the proceeding ranges + // note: this may increase if we decide to tenure additional ranges, at which point it will be the actual newPeakMax + int newMinPeakMax = scan.newPeakMax(tenured); + int minUntenuredIndex = scan.minUntenuredIndex(checkpointIndex, tenured); + int minScanIndex = Scan.minScanIndex(checkpointIndex, newMinPeakMax); + + // we now make sure tenured and scan are correct for the new parameters. + // 1) if our peakMax is lower then we need to go back and find items to tenure that we previously marked for scanning + // 2) we must also reset our scan distances + + // since our peakMax is determined by tenured.count(), but we are tenuring items here we keep things simple + // and do not account for those items we tenure but would later permit to scan as our peakMax grows + + int ri = Math.min(minUntenuredIndex, minScanIndex); + while (ri < checkpointIndex) + { + RoutingKey end = accessor.end(ranges, ri); + int newPeakMax = scan.newPeakMax(tenured); + int scanLimit = scanLimit(ri, newPeakMax); + if (shouldTenure(end, scanLimit)) + { + // note: might have already been tenured + // in this case our untenureLimit may be incorrect, but we won't use it + if (ri >= minUntenuredIndex && newPeakMax < oldPeakMax) + tenured.tenure(end, ri, ranges, scanLimit + 1, scanLimit(ri, oldPeakMax)); + } + else + { + // this might effectively remove a previously tenured item + scan.update(end, ri, ranges, scanLimit, null); + } + ++ri; + } + + scan.resetPeakMax(tenured); + } + + pending.atIndex = checkpointIndex; + pending.clear(); + tenured.rollover(pending); + } + + private int extendScanDistance(int lastIndex, int scanDistance) + { + // now we've established our lower bound on scan distance, see how many checkpoints we can remove + // by increasing our scan distance so that it remains proportional to the number of results returned + // TODO (low priority, efficiency): can reduce cost here by using scanDistances array for upper bounds to scan distance + int maxScanDistance = scan.goal() + 2 * Math.min(tenured.count(), tenured.countAtPrevCheckpoint()); + if (maxScanDistance >= 1 + scanDistance + scanDistance/4 && pending.count() >= (maxScanDistance - scanDistance)/2) + { + int removeCount = 0; + int extendedScanDistance = scanDistance; + int target = (maxScanDistance - scanDistance)/2; + for (int i = 0 ; i < pending.count() ; ++i) + { + Tenured t = pending.get(i); + if (t.index < 0) + continue; + + int distance = Math.min(lastIndex, t.lastIndex) - t.index; + if (distance <= scanDistance) + continue; // already scanned or untenured + + if (distance <= maxScanDistance) + { + ++removeCount; + extendedScanDistance = Math.max(extendedScanDistance, distance); + if (extendedScanDistance == maxScanDistance && removeCount >= target) + break; + } + } + + // TODO (low priority, efficiency): should perhaps also gate this decision on the span we're covering + // algorithmically, however, so long as we are under maxScanDistance we are fine + if (removeCount >= (extendedScanDistance - scanDistance)/2) + scanDistance = extendedScanDistance; + } + return scanDistance; + } + + int writeList(PendingCheckpoint pending) + { + int startIndex = listCount; + for (int i = pending.count() - 1 ; i >= 0 ; --i) + { + Tenured t = pending.get(i); + if (t.index >= 0) + { + lists[listCount++] = t.index; + } + else + { + int index = t.index & ~BIT31; + int length = t.linkLength & ~BIT31; + if (length <= 0xff && index <= 0xfffff) + { + lists[listCount++] = BIT31 | BIT30 | (length << 20) | index; + } + else if (t.linkLength >= 0 && length < BIT30) + { + lists[listCount++] = BIT31 | BIT29 | index; + } + else + { + lists[listCount++] = BIT31 | length; + lists[listCount++] = BIT31 | pending.count(); + } + } + } + return startIndex; + } + + void writeHeader(int scanDistance, int lowerBound) + { + int headerScanDistance = Math.min(scanDistance, MAX_SCAN_DISTANCE); + + if ((checkpointCount & 3) == 0) + headers[headerPointer++] = headerScanDistance; + else + headers[headerPointer - (1 + (checkpointCount & 3))] |= headerScanDistance << (8 * (checkpointCount & 3)); + + bounds[checkpointCount++] = lowerBound; + headers[headerPointer++] = listCount; + + if (scanDistance >= MAX_SCAN_DISTANCE) + lists[listCount++] = -scanDistance; // serialize as a negative value so we ignore it in most cases automatically + } + + void closeHeaders() + { + // write our final checkpoint header + if ((checkpointCount & 3) == 0) headers[headerPointer++] = 0; + headers[headerPointer++] = listCount; + } + + void ensureCapacity(int maxPendingSize) + { + if (listCount + maxPendingSize >= lists.length) + lists = cachedInts().resize(lists, listCount, lists.length + lists.length/2 + maxPendingSize); + } + + static class Scan + { + final Accessor accessor; + /** the scan distance we are aiming for; should be proportional to log2(N) */ + int goal; + + /** the indexes at which we increased the scan distance, and the new scan distance */ + int[] distances = new int[16]; + /** the number of unique scan distances we have adopted since the last checkpoint */ + int count; + /** the highest scan distance we have adopted (==scanDistance(scanDistanceCount-1)) */ + int watermark; + /** + * the first index we have tenured a range from, but for which we did not immediately write a new checkpoint + * we *must* scan at least from the last index in the checkpoint to here + */ + int scanTenuredAtIndex = -1; + + /** The maximum (i.e. initial) scan distance limit we have used since the last attempted checkpoint write */ + int peakMax; + + Scan(Accessor accessor) + { + this.accessor = accessor; + } + + void init(int goalScanDistance) + { + goal = peakMax = goalScanDistance; + } + + private void update(RoutingKey end, int atIndex, Ranges ranges, int scanLimit, CheckpointIntervalArrayBuilder checkpoint) + { + int newScanDistance = find(end, atIndex, ranges, scanLimit, watermark); + updateScanDistance(atIndex, newScanDistance, checkpoint); + } + + private void updateScanDistance(int atIndex, int newScanDistance, CheckpointIntervalArrayBuilder checkpoint) + { + if (newScanDistance > watermark) + { + // TODO (desired, efficiency): we don't mind slight increases to the watermark; + // should really look at scan distance history and ensure we haven't e.g. doubled since + // some earlier point (and should track the match count + scan distance at each bump + // to check overall work hasn't increased too much) + if (checkpoint != null && checkpoint.canWriteCheckpoint(atIndex)) + checkpoint.writeCheckpoint(atIndex); + + watermark = newScanDistance; + if (count * 2 == distances.length) + distances = Arrays.copyOf(distances, distances.length * 2); + distances[count * 2] = newScanDistance; + distances[count * 2 + 1] = atIndex; + ++count; + } + } + + private int find(RoutingKey end, int atIndex, Ranges ranges, int scanLimit, int currentScanDistance) + { + var c = accessor.keyComparator(); + int lowerIndex = accessor.binarySearch(ranges, atIndex + currentScanDistance, scanLimit, end, (e, s) -> c.compare(e, accessor.start(s)), CEIL); + if (lowerIndex < 0) lowerIndex = -2 - lowerIndex; + else lowerIndex -= 1; + return lowerIndex - atIndex; + } + + boolean isAboveGoal() + { + return watermark > goal; + } + + int watermark() + { + return watermark; + } + + int goal() + { + return goal; + } + + int distanceToTenured(int lastIndex) + { + return scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0; + } + + boolean mustCheckpointToScanTenured(int checkpointIndex, int maxScanDistance) + { + return scanTenuredAtIndex >= 0 && checkpointIndex - scanTenuredAtIndex >= maxScanDistance; + } + + /** + * Are we scanning a much longer distance than the minimum number of matches we know a query will return? + * Note: with Strategy.FAST, {@code tenured.count()} gets less accurate as scan distance increases, so this + * will bounce around triggering checkpoints due to the larger scan distance, resetting the scan distance + * and starting again + */ + boolean hasMaybeDivergedFromMatchSize(TenuredSet tenured) + { + return isAboveGoal() && tenured.count() < watermark()/2; + } + + private int distance(int i) + { + return distances[i*2]; + } + + private int index(int i) + { + return distances[i*2+1]; + } + + int finalise(int lastIndex) + { + Invariants.checkState(distanceToTenured(lastIndex) <= Math.max(watermark(), peakMax())); + + int scanDistance = watermark; + // then, compute the minimum scan distance implied by any tenured ranges we did not immediately + // write a checkpoint for - we *must* scan back as far as this record + int minScanDistance = scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0; + if (minScanDistance > scanDistance) + { + // if this minimum is larger than the largest scan distance we picked up for non-tenured ranges + // then we are done, as there's nothing we can save + scanDistance = minScanDistance; + } + else if (scanDistance > 0) + { + // otherwise, we can look to see if any of the scan distances we computed overflow the checkpoint, + // i.e. where no records served by this checkpoint need to scan the full distance to reach it + int distanceToLastScanIndex = lastIndex - index(count -1); + // if the distance to the last scan index is larger than its scan distance, we have overflowed; + if (distanceToLastScanIndex < scanDistance) + { + minScanDistance = Math.max(distanceToLastScanIndex, minScanDistance); + // loop until we find one that doesn't overflow, as this is another minimum scan distance + int i = count - 1; + while (--i >= 0) + { + int distance = lastIndex - index(i); + if (distance >= distance(i)) break; + else if (distance > minScanDistance) minScanDistance = distance; + } + if (i >= 0) scanDistance = Math.max(minScanDistance, distance(i)); + else scanDistance = minScanDistance; + } + } + + return scanDistance; + } + + void reset() + { + // we could in theory reset our scan distance using the contents of scanDistance[] + // but it's a bit complicated, as we want to have the first item to increment the scan distance + // so that we can use it in writeScanDistance to shrink the scan distance; + // jumping straight to the highest scan distance breaks this + count = 0; + scanTenuredAtIndex = -1; + watermark = 0; + } + + void resetPeakMax(TenuredSet tenured) + { + peakMax = newPeakMax(tenured); + } + + int peakMax() + { + return peakMax; + } + + int newPeakMax(TenuredSet tenured) + { + return Math.max(goal, tenured.count()); + } + + /** + * The minimum index containing a range that might need to be tenured, if we have a smaller max scan distance than before + */ + int minUntenuredIndex(int checkpointIndex, TenuredSet tenured) + { + int minUntenuredIndex = Math.max(0, (checkpointIndex - 1) - watermark()); + // the maximum scan distance that cxould ever have been adopted for the ranges processed since last checkpoint + int oldPeakMax = peakMax; + int newMinPeakMax = newPeakMax(tenured); + if (newMinPeakMax < oldPeakMax) + { + // minimise range we unnecessarily re-tenure over + // TODO (low priority, efficiency): see if can also use to reduce range we re-scan e.g. can recycle + // scanDistances contents if we know we won't need to step back further at next checkpoint + for (int i = count - 1; i >= 0 ; --i) + { + if (index(i) <= minUntenuredIndex) + break; + if (distance(i) <= newMinPeakMax) + return i + 1 == count ? index(i) : index(i + 1) - 1; + } + } + return minUntenuredIndex; + } + + /** + * Record that a range at this index has been tenured, so that we can track how far back + * we need to scan to determine how long we can defer writing a new checkpoint while still + * being able to scan it. + * + * TODO (low priority, efficiency): when a checkpoint is written, we should consider moving it + * earlier if the scan distance is increased primarily because of this index, and the tenured + * collection is otherwise unchanged (so can be written with minimal overhead) + */ + void tenured(int atIndex) + { + if (scanTenuredAtIndex < 0) + scanTenuredAtIndex = atIndex; + } + + static int minScanIndex(int checkpointIndex, int scanDistance) + { + return Math.max(0, (checkpointIndex - 1) - scanDistance); + } + + @Override + public String toString() + { + return "Scan{watermark=" + watermark + ", tenured=" + scanTenuredAtIndex + '}'; + } + } + + /** + * Record-keeping for a range we have decided is not scannable + */ + static class Tenured implements Comparable> + { + final Accessor accessor; + /** + * The end of the tenured range covered by the contents referred to be {@link #index} + */ + RoutingKey end; + + /** + *

    + *
  • If positive, this points to {@code ranges[index]}
  • + *
  • If negative, this points to an entry in {@link #lists}; + * see {@link SearchableRangeList#checkpointLists}
  • + *
+ */ + int index; + + /** + * The last index in {@link #ranges} covered by this tenured range + */ + int lastIndex; + + /** + * set when this record is serialized in a checkpoint list to either: + *
    + *
  • point to itself, in which case no action should be + * taken on removal (it is only retained for size bookkeeping); or
  • + *
  • point to the next item in the checkpoint list; the first + * such element removed triggers the clearing of the checkpoint + * list so that its entries are re-inserted in the next checkpoint
  • + *
+ */ + Tenured next; + + /** + * Only set for link entries, i.e. where {@code index < 0}. + *
    + *
  • if positive, the length is optional as we will terminate safely using the end bound filtering
  • + *
  • if negative, the low 31 bits must be retrieved as the length for safe iteration
  • + *
+ */ + int linkLength; + + Tenured(Accessor accessor, RoutingKey end, int index) + { + this.accessor = accessor; + this.end = end; + this.index = index; + } + + @Override + public int compareTo(@Nonnull Tenured that) + { + int c = accessor.keyComparator().compare(this.end, that.end); + // we sort indexes in reverse order so later tenured items find the earlier ones with same end when searching + // for higher entries for the range of indexes to search, and + if (c == 0) c = -Integer.compare(this.index, that.index); + return c; + } + + @Override + public String toString() + { + return "Tenured{end=" + end + ", index=" + index + '}'; + } + } + + /** + * The set of ranges that we intend to write to checkpoints that remain open at the current point in the iteration + * This collection may be filtered before serialization, but every member will be visited either by scanning + * or visiting the checkpoint list + * TODO (low priority, efficiency): save garbage by using an insertion-sorted array for collections where + * this is sufficient. later, introduce a mutable b-tree supporting object recycling. we would also like + * to use a collection that permits us to insert and return a finger into the tree so we can find the + * successor as part of insertion, and that permits constant-time first() calls + */ + static class TenuredSet extends TreeSet> + { + final Accessor accessor; + /** + * the number of direct tenured entries (i.e. ignoring link entries) + * this is used to provide a minimum bound on the number of results a range query can return + * note: with Strategy.FAST this gets less accurate as the span distance increases + */ + int directCount; + int directCountAtPrevCheckpoint; + int minSpan; + + // a stack of recently used EndAndIndex objects - used only for the duration of a single build + Tenured reuse, pendingReuse, pendingReuseTail; + + TenuredSet(Accessor accessor) + { + this.accessor = accessor; + } + + int count() + { + return directCount; + } + + int countAtPrevCheckpoint() + { + return directCountAtPrevCheckpoint; + } + + /** + * We require a checkpoint to cover a distance at least as large as the number of tenured ranges leftover + * since the prior checkpoint, to ensure these require at most linear additional space, while not requiring + * more than O(k) additional complexity on search (i.e., we will scan a number of elements at most equal + * to the number we have to visit in the checkpoint). + * + * We achieve this by recording the minimum number of match results as of the prior checkpoint (i.e. {@link #count()}) + * and discounting it by one each time we untenure a range, so that for each tenured range from the prior checkpoint + * we have either untenured a range or processed at least one additional input. + */ + int minimumSpan() + { + return minSpan; + } + + private int tenure(RoutingKey end, int index, Ranges ranges, int minUntenureIndex) + { + return tenure(newTenured(end, index), ranges, minUntenureIndex, accessor.size(ranges)); + } + + private void tenure(RoutingKey end, int index, Ranges ranges, int minUntenureIndex, int untenureLimit) + { + tenure(newTenured(end, index), ranges, minUntenureIndex, untenureLimit); + } + + private int tenure(Tenured tenure, Ranges ranges, int untenureMinIndex, int untenureLimit) + { + if (!add(tenure)) + return tenure.lastIndex; + + Tenured next = higher(tenure); + if (next != null) + untenureLimit = Math.min(untenureLimit, next.lastIndex + 1); + var c = accessor.keyComparator(); + int untenureIndex = accessor.binarySearch(ranges, untenureMinIndex, untenureLimit, tenure.end, (e, s) -> c.compare(e, accessor.start(s)), CEIL); + if (untenureIndex < 0) untenureIndex = -1 - untenureIndex; + tenure.lastIndex = untenureIndex - 1; + Invariants.checkState(c.compare(tenure.end, accessor.start(ranges, tenure.lastIndex)) > 0); + Invariants.checkState(tenure.lastIndex + 1 == accessor.size(ranges) || c.compare(tenure.end, accessor.start(ranges, tenure.lastIndex + 1)) <= 0); + ++directCount; + return untenureIndex - 1; + } + + private Tenured newTenured(RoutingKey end, int index) + { + Tenured result = reuse; + if (result == null) + return new Tenured<>(accessor, end, index); + + reuse = result.next; + result.end = end; + result.index = index; + result.lastIndex = 0; + result.next = null; + return result; + } + + private Tenured addLinkEntry(RoutingKey end, int index, int lastIndex, int length) + { + Invariants.checkArgument(index < 0); + Tenured result = newTenured(end, index); + result.linkLength = length; + result.lastIndex = lastIndex; + add(result); + return result; + } + + /** + * Retire any active tenured ranges that no longer cover the pointer into ranges; + * if this crosses our checkpoint threshold, write a new checkpoint. + */ + void untenure(int index) + { + while (!isEmpty() && first().lastIndex < index) + { + Tenured removed = pollFirst(); + + // if removed.next == null, this is not referenced by a link + // if removed.next == removed, it is referenced by a link but does not modify the link on removal + if (removed.next != null && removed.next != removed) + { + // this is a member of a link's chain, which may serve one of two purposes: + // 1) it may be the entry nominated to invalidate the link, due to the link + // membership shrinking below the required threshold; in which case we + // must clear the chain to reactivate its members for insertion into the + // next checkpoint, and remove the chain link itself + // 2) it may be nominated as an entry to update the chain link info, to make + // it more succinct: if every entry of the chain remains active, and there + // are *many* entries then we need two integers to represent the chain, but + // as soon as any entry is invalid we can rely on this entry to terminate + // iteration, so we update the bookkeeping on the first entry we remove in + // this case + + // first clear the chain starting at the removed entry + Tenured prev = removed, next = removed.next; + while (next.next != null) + { + prev = next; + next = next.next; + prev.next = null; + } + Invariants.checkState(next.index < 0); + if (prev.end == next.end) + { + // if this is the last entry in the link, the link is expired and should be removed/reused + remove(next); + if (pendingReuseTail == null) + pendingReuseTail = next; + next.next = pendingReuse; + pendingReuse = next; + } + else if (next.linkLength < 0) + { + // otherwise, flag the link as safely consumed without knowing the length + next.linkLength = next.linkLength & Integer.MAX_VALUE; + } + } + + // this was not a link reference; update our bookkeeping and save it for reuse + Invariants.checkState(removed.index >= 0); + --directCount; + --minSpan; + if (pendingReuseTail == null) + pendingReuseTail = removed; + removed.next = pendingReuse; + pendingReuse = removed; + } + } + + /** + * Write out any direct entries that are not pointed to by a chain entry, and any chain entries; + * rollover any per-checkpoint data and free up for reuse discarded Tenured objects + */ + void rollover(PendingCheckpoint pending) + { + for (Tenured tenured : this) + { + if (tenured.next == null) + pending.add(tenured); + } + // make freed Tenured objects available for reuse + if (pendingReuse != null) + { + pendingReuseTail.next = reuse; + reuse = pendingReuse; + pendingReuseTail = pendingReuse = null; + } + directCountAtPrevCheckpoint = minSpan = directCount; + } + } + + /** + * we write checkpoints out before knowing the scan distance needed for the range, as a checkpoint precedes + * the ranges it covers; so we record the position and contents of the checkpoint, and once the scan distance is + * known (i.e. when the next checkpoint is written) we re-process the list to remove items we can now scan before + * serializing to checkpointListsBuf. + */ + static class PendingCheckpoint + { + int atIndex = -1; + int count; + + Tenured[] contents = new Tenured[10]; + + int openDirectCount, firstOpenDirect, openIndirectCount; + boolean hasClosedDirect; + + int count() + { + return count; + } + + Tenured get(int i) + { + return contents[i]; + } + + void add(Tenured tenured) + { + if (contents.length == count) + contents = Arrays.copyOf(contents, 2 * contents.length); + contents[count++] = tenured; + } + + void clear() + { + count = 0; + } + + /** + * Remove pending entries that will be scanned by the scanDistance, and update + * our bookkeeping for creating links + */ + int filter(int scanDistance, int lastIndex) + { + int matchCountModifier = 0; + int maxi = count; + count = 0; + openDirectCount = 0; + openIndirectCount = 0; + firstOpenDirect = -1; +// lastClosedDirect = -1; + + for (int i = 0; i < maxi ; ++i) + { + Tenured t = get(i); + if (t.index >= 0) + { + if (t.index + scanDistance >= lastIndex) + continue; // last index will find it with a scan + + if (t.lastIndex <= t.index + scanDistance) + continue; // all indexes will find it with a scan + + if (t.lastIndex > lastIndex) + { + // this range remains open for the next checkpoint; + // we may want to reference this list from there + // so track count and position of first one to make a determination + ++openDirectCount; + if (firstOpenDirect < 0) firstOpenDirect = count; + } + else hasClosedDirect = true; + } + else + { + // note: we over count here, as we count pointers within the chain + matchCountModifier += (t.linkLength & Integer.MAX_VALUE) - 1; // (subtract 1 to discount the pointer) + if (t.lastIndex > lastIndex) + ++openIndirectCount; + } + + if (i == count) ++count; + else contents[count++] = t; + } + + return count + matchCountModifier; + } + + /** + * Setup a link for referencing this chain later, if permitted. + * Must have at least two items, and at least as many direct records as indirect + */ + void setupLinkChain(TenuredSet tenured, int startIndex, int endIndex) + { + int minSizeToReference = openIndirectCount + MIN_INDIRECT_LINK_LENGTH; + if (openDirectCount >= minSizeToReference) + { + int i = firstOpenDirect; + Tenured prev = get(i++); + + while (openDirectCount > minSizeToReference) + { + Tenured e = get(i++); + if (e.index < 0) + { + --minSizeToReference; + continue; + } + + Invariants.checkState(prev.next == null); + prev.next = prev; + prev = e; + --openDirectCount; + } + + while (i < count) + { + Tenured next = get(i++); + if (next.index < 0) + continue; + + Invariants.checkState(prev.next == null); + prev.next = next; + prev = next; + } + + // may be more than one entry per item (though usually not) + int length = endIndex - startIndex; + Tenured chainEntry = tenured.addLinkEntry(prev.end, BIT31 | startIndex, prev.lastIndex, length); + prev.next = chainEntry; + if (hasClosedDirect && (startIndex > 0xfffff || (length > 0xff))) + { + // TODO (expected, testing): make sure this is tested, as not a common code path (may never be executed in normal operation) + // we have no closed ranges so iteration needs to know the end bound, but we cannot encode our bounds cheaply + // so link the first bound to the chain entry, so that on removal it triggers an update of endIndex to note + // that it can be iterated safely without an end bound + get(firstOpenDirect).next = chainEntry; + } + } + } + + @Override + public String toString() + { + return Arrays.stream(contents, 0, count) + .map(Objects::toString) + .collect(Collectors.joining(",", "[", "]")); + } + } +} diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeList.java b/accord-core/src/main/java/accord/utils/SearchableRangeList.java index 22aeb3a111..65e4465fce 100644 --- a/accord-core/src/main/java/accord/utils/SearchableRangeList.java +++ b/accord-core/src/main/java/accord/utils/SearchableRangeList.java @@ -18,17 +18,14 @@ package accord.utils; -import accord.api.RoutingKey; import accord.primitives.Range; import accord.primitives.RoutableKey; -import accord.utils.SearchableRangeListBuilder.Links; -import accord.utils.SearchableRangeListBuilder.Strategy; +import accord.utils.CheckpointIntervalArrayBuilder.Links; +import accord.utils.CheckpointIntervalArrayBuilder.Strategy; import net.nicoulaj.compilecommand.annotations.Inline; -import java.util.*; - -import static accord.utils.SearchableRangeListBuilder.Links.LINKS; -import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE; +import static accord.utils.CheckpointIntervalArrayBuilder.Links.LINKS; +import static accord.utils.CheckpointIntervalArrayBuilder.Strategy.ACCURATE; import static accord.utils.SortedArrays.Search.*; /** @@ -79,104 +76,13 @@ * earlier checkpoints. * */ -public class SearchableRangeList +public class SearchableRangeList extends CheckpointIntervalArray { - // scan distance can be kept very small as we guarantee to use at most linear extra space even with a scan distance of zero - static final int MAX_SCAN_DISTANCE = 255; - private static final int BIT30 = 0x40000000; - private static final int BIT29 = 0x20000000; - private static final SearchableRangeList EMPTY_CHECKPOINTS = new SearchableRangeList(new Range[0], new int[0], new int[] { 0, 0 }, new int[0], 0); - final Range[] ranges; - - /** - * The lower bound for each checkpoint. - * The checkpoint {@code i} applies to all ranges (incl) starting from {@code lowerBounds[i]}, - * but before (excl) {@code lowerBounds[i+1]}. - */ - final int[] lowerBounds; - - /** - * Logically one entry per checkpoint, mapping {@link #lowerBounds} to {@link #checkpointLists}, - * however we also encode an additional byte per entry representing the scan distance for the - * ranges handled by this checkpoint. These are grouped into an integer per four mappings, i.e. - * we encode batches of five ints, with the first int containing the four scan distances for the - * next four checkpoints, and the following four ints containing the respective offsets into - * {@link #checkpointLists}. - *

- * [0.........32b.........64b.........96b........128b........160b........192b] - * [ d1 d2 d3 d4 mapping1 mapping2 mapping3 mapping4 d5 d6 d7 d8 ] - */ - final int[] headers; - - /** - * A list of indexes in {@link #ranges} contained by each checkpoint; checkpoints are - * mapped from {@link #lowerBounds} by {@link #headers}. - *

- * Entries are sorted in descending order by the end of the range they cover, so that - * a search of this collection my terminate as soon as it encounters a range that does - * not cover the item we are searching for. - *

- * This collection may contain negative values, in which case these point to other - * checkpoints, whose direct contents (i.e. the positive values of) we may - * search. - *

    if negative, points to an earlier checkpoint, and: - *
  • if the 30th bit is set, the low 20 bits point to checkpointsList, - * and the 9 bits in-between provide the length of the range
  • - *
  • otherwise, if the 29th bit is set, the lower 29 bits points to checkpointsList, - * and can be iterated safely without an endIndex
  • - *
  • otherwise, the low 29 bits provide the length of the run, and the low 31 bits - * of the following entry (which will also be negative) provide a pointer to - * checkpointsList
  • - *
- */ - final int[] checkpointLists; - - public final int maxScanAndCheckpointMatches; - - SearchableRangeList(Range[] ranges, int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches) + public SearchableRangeList(Range[] ranges, int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches) { - this.ranges = ranges; - this.lowerBounds = lowerBounds; - this.headers = headers; - this.checkpointLists = checkpointLists; - this.maxScanAndCheckpointMatches = maxScanAndCheckpointMatches; - } - - @Inline - public int forEach(Range range, IndexedQuadConsumer forEachScanOrCheckpoint, IndexedRangeQuadConsumer forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex) - { - return forEach(range.start(), range.end(), forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex); - } - - public int forEach(RoutingKey startKey, RoutingKey endKey, IndexedQuadConsumer forEachScanOrCheckpoint, IndexedRangeQuadConsumer forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex) - { - if (ranges.length == 0 || minIndex == ranges.length) - return minIndex; - - int end = SortedArrays.binarySearch(ranges, minIndex, ranges.length, endKey, (a, b) -> a.compareTo(b.start()), CEIL); - if (end < 0) end = -1 - end; - if (end <= minIndex) return minIndex; - - int floor = SortedArrays.binarySearch(ranges, minIndex, ranges.length, startKey, (a, b) -> a.compareTo(b.start()), CEIL); - int start = floor; - if (floor < 0) - { - // if there's no precise match on start, step backwards; - // if this range does not overlap us, step forwards again for start - // but retain the floor index for performing scan and checkpoint searches from - // as this contains all ranges that might overlap us (whereas those that end - // after us but before the next range's start would be missed by the next range index) - start = floor = -2 - floor; - if (start < 0) - start = floor = 0; - else if (ranges[start].end().compareTo(startKey) <= 0) - ++start; - } - - // Since endInclusive() != startInclusive(), so no need to adjust start/end comparisons - return forEach(start, end, floor, startKey, 0, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex); + super(SearchableRangeListBuilder.RANGE_ACCESSOR, ranges, lowerBounds, headers, checkpointLists, maxScanAndCheckpointMatches); } @Inline @@ -209,95 +115,6 @@ else if (ranges[start].compareEndTo(key) < 0) return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex); } - @Inline - private int forEach(int start, int end, int floor, RoutableKey startBound, int cmpStartBoundWithEnd, - IndexedQuadConsumer forEachScanOrCheckpoint, IndexedRangeQuadConsumer forEachRange, - P1 p1, P2 p2, P3 p3, P4 p4, int minIndex) - { - if (start < minIndex) start = minIndex; - - // find the checkpoint array, so we know how far to step back - int checkpoint = Arrays.binarySearch(lowerBounds, floor); - if (checkpoint < 0) checkpoint = -2 - checkpoint; - if (checkpoint < 0) return end; - - int headerBaseIndex = (checkpoint / 4) * 5; - int headerSubIndex = checkpoint & 3; - int headerListIndex = headerBaseIndex + 1 + headerSubIndex; - - int scanDistance = (headers[headerBaseIndex] >>> (8 * headerSubIndex)) & 0xff; - int checkpointStart = headers[headerListIndex]; - int checkpointEnd = headers[headerListIndex + (headerSubIndex + 5)/4]; // skip the next header - - if (scanDistance == MAX_SCAN_DISTANCE) - { - scanDistance = -checkpointLists[checkpointStart++]; - Invariants.checkState(scanDistance >= MAX_SCAN_DISTANCE); - } - - // NOTE: we visit in approximately ascending order, and this is a requirement for correctness of RangeDeps builders - // Only the checkpoint is visited in uncertain order, but it is visited entirely, before the scan matches - // and the range matches - int minScanIndex = Math.max(floor - scanDistance, minIndex); - for (int i = checkpointStart; i < checkpointEnd ; ++i) - { - int ri = checkpointLists[i]; - if (ri < 0) - { - int subStart, subEnd; - if ((ri & BIT30) != 0) - { - subStart = ri & 0xfffff; - subEnd = subStart + ((ri >>> 20) & 0x1ff); - } - else if ((ri & BIT29) != 0) - { - subStart = ri & 0x1fffffff; - subEnd = Integer.MAX_VALUE; - } - else - { - int length = ri & 0x1fffffff; - subStart = checkpointLists[++i]; - subEnd = subStart + length; - } - - for (int j = subStart ; j < subEnd ; ++j) - { - ri = checkpointLists[j]; - if (ri < 0) - continue; - - if (ranges[ri].end().compareTo(startBound) <= cmpStartBoundWithEnd) - break; - - if (ri >= minIndex && ri < minScanIndex) - forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri); - } - } - else - { - // if startBound is key, we cannot be equal to it; - // if startBound is a Range start, we also cannot be equal to it due to the requirement that - // endInclusive() != startInclusive(), so equality really means inequality - if (ranges[ri].end().compareTo(startBound) <= cmpStartBoundWithEnd) - break; - - if (ri >= minIndex && ri < minScanIndex) - forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri); - } - } - - for (int i = minScanIndex; i < floor ; ++i) - { - if (ranges[i].end().compareTo(startBound) > cmpStartBoundWithEnd) - forEachScanOrCheckpoint.accept(p1, p2, p3, p4, i); - } - - forEachRange.accept(p1, p2, p3, p4, start, end); - return end; - } - public static SearchableRangeList build(Range[] ranges) { if (ranges.length == 0) diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java index 77d351a7f9..f25c7867fb 100644 --- a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java +++ b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java @@ -18,1075 +18,90 @@ package accord.utils; -import accord.api.RoutingKey; -import accord.primitives.Range; -import accord.utils.ArrayBuffers.IntBuffers; +import java.util.Comparator; -import javax.annotation.Nonnull; -import java.util.Arrays; -import java.util.Objects; -import java.util.TreeSet; -import java.util.stream.Collectors; +import accord.primitives.Range; +import accord.primitives.RoutableKey; -import static accord.utils.ArrayBuffers.cachedInts; import static accord.utils.SearchableRangeList.MAX_SCAN_DISTANCE; -import static accord.utils.SearchableRangeListBuilder.Links.LINKS; -import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE; -import static accord.utils.SortedArrays.Search.CEIL; /** * Builder for {@link SearchableRangeList} */ -public class SearchableRangeListBuilder +public class SearchableRangeListBuilder extends CheckpointIntervalArrayBuilder { - public enum Strategy - { - /** - * Do not tenure any ranges that are scannable from the currently in-effect max scan distance. - * This means we probably do less work on construction, but that our measure of the match count - * at any point is inaccurate, and so our heuristics for when to write checkpoints may be wrong, - * leading to more checkpoints than necessary. - */ - FAST, - - /** - * Tenure every range covering more than goalScanDistance. Any within max scan distance will also - * update the scan distance, so that they will be filtered when a checkpoint is written. But - * in the meantime they permit accurate tracking of the number of matches a query can return, - * permitting our complexity calculations (that determine when checkpoints should be written, and - * what our maximum scan distance should be), to be accurate. This can avoid bouncing between - * two extremes, where a low max scan distance tenures and correctly detects a desirable larger scan - * distance, which we rollover and this prevents us tenuring and tracking the number of matches, so - * that we then pick a low max scan distance (and thereby also write a new checkpoint) - */ - ACCURATE - } - - /** - * Should we maintain pointers to prior checkpoints that we may reference instead of reserializing - * the remaining contents. This is cheap to visit as we stop enumerating as soon as we encounter - * an entry that no longer covers us. We use some simple heuristics when deciding whether to do - * this, namely that there are at least two entries (so we save one checkpoint entry) and that - * there is at least one direct entry for each indirect/link entry in the range we will link. - */ - public enum Links - { - LINKS, - NO_LINKS - } - - private static final int BIT31 = 0x80000000; - private static final int BIT30 = 0x40000000; - private static final int BIT29 = 0x20000000; - static final int MIN_INDIRECT_LINK_LENGTH = 2; - - final boolean isAccurate; - final boolean withLinks; - final Range[] ranges; - - int[] bounds; - int[] headers; - int[] lists; - int checkpointCount, headerPointer, listCount; - - final Scan scan = new Scan(); - final TenuredSet tenured = new TenuredSet(); - final PendingCheckpoint pending = new PendingCheckpoint(); - - // track the maximum possible number of entries we can match with both a scan + checkpoint lookup - // this is an over-estimate and may be used by consumers to allocate out-of-order buffers for visitations - int maxScanAndCheckpointMatches; - - public SearchableRangeListBuilder(Range[] ranges, Strategy strategy, Links links) - { - this(ranges, Math.min(MAX_SCAN_DISTANCE, 34 - Integer.numberOfLeadingZeros(ranges.length)), strategy, links); - } - - public SearchableRangeListBuilder(Range[] ranges, int goalScanDistance, Strategy strategy, Links links) - { - this.isAccurate = strategy == ACCURATE; - this.withLinks = links == LINKS; - Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE); - Invariants.checkArgument(goalScanDistance > 0); - this.ranges = ranges; - init(ranges, goalScanDistance); - } - - void init(Range[] ranges, int goalScanDistance) - { - // we write checkpoints at least goalScanDistance apart - scan.init(goalScanDistance); - IntBuffers cachedInts = cachedInts(); - // ask for int buffers in descending order of size - this.lists = cachedInts.getInts(ranges.length); // this one might need to grow - // +2 to round-up each division, and +2 to account for the final entry (which might require an empty scan distance header) - this.headers = cachedInts.getInts(((ranges.length / goalScanDistance) * 5) / 4 + 4); - this.bounds = cachedInts.getInts(ranges.length / goalScanDistance + 1); - } - - /** - * Walk over each range, looking ahead by {@link #maxScanDistance} to decide if a range should - * be tenured (written to a checkpoint) or scanned; the maximum scan distance is determined by the - * number of open tenured entries, i.e. the minimum number of results we can expect to be returned - * (or, if greater, the logarithm of the number of ranges in the collection). - *

- * Once we encounter a range that should be tenured, either write a checkpoint immediately - * or make a note of the position we must scan to from the last entry in this checkpoint - * and wait until it is permitted to write a checkpoint. This range will be tenured either - * way for the following checkpoint. - *

- * The only reason not to write a checkpoint immediately is in the case we would breach - * our linear space complexity limit, which is imposed by ensuring we have a space between - * checkpoints at least as large as the number of entries written to the last checkpoint, - * discounted by the number of entries we have removed from the tenured collection since - * the last checkpoint. - */ - public SearchableRangeList build() - { - for (int ri = 0 ; ri < ranges.length ; ++ri) - { - // write a checkpoint if we meet our linear space complexity requirements - // and we either have a tenured range that we must scan, - // or the scan distance is now much larger than the minimum number of search results - if (shouldWriteCheckpoint(ri)) - writeCheckpoint(ri); - - // either tenure or update scan distance, potentially writing a checkpoint - tenureOrScan(ri); - tenured.untenure(ri); - } - - // write our final pending checkpoint - writeCheckpoint(ranges.length); - closeHeaders(); - - IntBuffers cachedInts = cachedInts(); - int[] lists = cachedInts.completeAndDiscard(this.lists, listCount); - int[] headers = cachedInts.completeAndDiscard(this.headers, headerPointer); - int[] bounds = cachedInts.completeAndDiscard(this.bounds, checkpointCount); - return new SearchableRangeList(ranges, bounds, headers, lists, maxScanAndCheckpointMatches); - } - - /** - * Categorise the candidateIdx as either scannable, and if so update the scan distance; - * or unscannable, in which case add it to the {@link #tenured} collection. - * Note, that in ACCURATE mode we tenure the item if it is outside of the goalScanDistance - * so we may track O(k) accurately above the O(lg2(N)) search and default scan distance, - * but we still update the scan distance so that the checkpoint will exclude this entry. - */ - private void tenureOrScan(int index) - { - Invariants.checkArgument(index >= 0); - - // then either migrate the index to pendingTenured, or ensure it will be scanned - RoutingKey end = ranges[index].end(); - int scanLimit = scanLimit(index, isAccurate ? scan.goal : maxScanDistance()); - if (shouldTenure(end, scanLimit)) - { - int lastIndex = tenured.tenure(end, index, ranges, scanLimit + 1); - if (lastIndex - index > maxScanDistance()) scan.tenured(index); - else if (!isAccurate) throw new IllegalStateException(); - else scan.updateScanDistance(index, lastIndex - index, this); - } - else - { - // TODO (low priority, efficiency): if the prior checkpoint has a scan distance >= this one, - // and <= 50% more than this one and there's no scanMustReachIndex nor tenuredRanges, don't - // write a new checkpoint (perhaps split shouldWriteCheckpoint logic in two) - scan.update(end, index, ranges, scanLimit, this); - } - } - - /** - * We are forbidden from writing a checkpoint nearer than this to a prior checkpoint. - * This imposes our linear space complexity bounds, while not harming our O(log2(N) + K) - * complexity bounds, as we guarantee minimumSpan is never more than the number of query - * results. - */ - private int minimumSpan() - { - return Math.max(scan.goal(), tenured.minimumSpan()); - } - - private int maxScanDistance() - { - // minimumSpan() reduces overtime, but there is no reason to reduce our scan distance - // for tenuring below the scan distance we will write - return Math.max(scan.watermark(), minimumSpan()); - } - - /** - * The index after the last index we can scan from {@code atIndex} with at most {@code maxScanDistance}. - */ - private int scanLimit(int atIndex, int maxScanDistance) - { - return Math.min(1 + atIndex + maxScanDistance, ranges.length); - } - - private boolean shouldTenure(RoutingKey end, int scanLimit) - { - return scanLimit < ranges.length && end.compareTo(ranges[scanLimit].start()) > 0; - } - - private boolean canWriteCheckpoint(int atIndex) - { - return atIndex - pending.atIndex >= minimumSpan(); - } - - private boolean shouldWriteCheckpoint(int atIndex) - { - if (!canWriteCheckpoint(atIndex)) - return false; - - // TODO (desired, efficiency): consider these triggers - if (scan.mustCheckpointToScanTenured(atIndex, maxScanDistance())) - return true; - - return scan.hasMaybeDivergedFromMatchSize(tenured); - } - - /** - * Write a checkpoint for ranges[prevCheckpointIndex...ri) - * - * 1) Finalise the scan distance - * 2) Write the header - * 3) Filter the pending tenured ranges to remove those we can scan - * 4) Write this list out - * 5) Setup a link to this list, if it is large enough - * 6) Rollover the scan, tenured and pending structures for the new pending checkpoint - */ - private void writeCheckpoint(int nextCheckpointIndex) + public static final Accessor RANGE_ACCESSOR = new Accessor<>() { - int lastIndex = nextCheckpointIndex - 1; - int scanDistance = scan.finalise(lastIndex); - scanDistance = extendScanDistance(lastIndex, scanDistance); - - if (pending.atIndex < 0) - { - // we don't have any checkpoints pending, so don't try to finalise it - // but if the new checkpoint doesn't cover index 0, insert a new empty - // checkpoint for the scan distance - if (nextCheckpointIndex > 0) - { - // setup an initial empty checkpoint to store the first scan distance - maxScanAndCheckpointMatches = scanDistance; - writeHeader(scanDistance, 0); - } - } - else - { - writeHeader(scanDistance, pending.atIndex); - int maxCheckpointMatchCount = pending.filter(scanDistance, lastIndex); - int listIndex = writeList(pending); - if (withLinks) - pending.setupLinkChain(tenured, listIndex, listCount); - maxScanAndCheckpointMatches = Math.max(maxScanAndCheckpointMatches, scanDistance + maxCheckpointMatchCount); - } - - savePendingCheckpointAndResetScanDistance(nextCheckpointIndex); - } - - private void savePendingCheckpointAndResetScanDistance(int checkpointIndex) - { - // use the tail of checkpointListBuf to buffer ranges we plan to tenure - ensureCapacity(tenured.count() + scan.watermark()); - - scan.reset(); - - if (isAccurate) - { - // TODO (low priority, efficiency): we can shift back the existing scanDistance if it's far enough from - // the next checkpoint. this might permit us to skip some comparisons - scan.resetPeakMax(tenured); - for (Tenured tenured : this.tenured) - { - int distanceToEnd = (tenured.lastIndex - checkpointIndex); - if (distanceToEnd >= scan.peakMax) - break; - - int scanDistance = tenured.lastIndex - tenured.index; - if (scanDistance <= scan.peakMax) - scan.updateScanDistance(tenured.index, scanDistance, null); - } - - if (scan.watermark() < scan.goal) - { - int ri = Scan.minScanIndex(checkpointIndex, scan.goal); - while (ri < checkpointIndex) - { - RoutingKey end = ranges[ri].end(); - int scanLimit = scanLimit(ri, scan.peakMax); - if (!shouldTenure(end, scanLimit)) - scan.update(end, ri, ranges, scanLimit, null); - ++ri; - } - } - } - else - { - // the maximum scan distance that could ever have been adopted for last chunk - int oldPeakMax = scan.peakMax(); - // the minimum scan distance we will start with for processing the proceeding ranges - // note: this may increase if we decide to tenure additional ranges, at which point it will be the actual newPeakMax - int newMinPeakMax = scan.newPeakMax(tenured); - int minUntenuredIndex = scan.minUntenuredIndex(checkpointIndex, tenured); - int minScanIndex = Scan.minScanIndex(checkpointIndex, newMinPeakMax); - - // we now make sure tenured and scan are correct for the new parameters. - // 1) if our peakMax is lower then we need to go back and find items to tenure that we previously marked for scanning - // 2) we must also reset our scan distances - - // since our peakMax is determined by tenured.count(), but we are tenuring items here we keep things simple - // and do not account for those items we tenure but would later permit to scan as our peakMax grows - - int ri = Math.min(minUntenuredIndex, minScanIndex); - while (ri < checkpointIndex) - { - RoutingKey end = ranges[ri].end(); - int newPeakMax = scan.newPeakMax(tenured); - int scanLimit = scanLimit(ri, newPeakMax); - if (shouldTenure(end, scanLimit)) - { - // note: might have already been tenured - // in this case our untenureLimit may be incorrect, but we won't use it - if (ri >= minUntenuredIndex && newPeakMax < oldPeakMax) - tenured.tenure(end, ri, ranges, scanLimit + 1, scanLimit(ri, oldPeakMax)); - } - else - { - // this might effectively remove a previously tenured item - scan.update(end, ri, ranges, scanLimit, null); - } - ++ri; - } - - scan.resetPeakMax(tenured); - } - - pending.atIndex = checkpointIndex; - pending.clear(); - tenured.rollover(pending); - } - - private int extendScanDistance(int lastIndex, int scanDistance) - { - // now we've established our lower bound on scan distance, see how many checkpoints we can remove - // by increasing our scan distance so that it remains proportional to the number of results returned - // TODO (low priority, efficiency): can reduce cost here by using scanDistances array for upper bounds to scan distance - int maxScanDistance = scan.goal() + 2 * Math.min(tenured.count(), tenured.countAtPrevCheckpoint()); - if (maxScanDistance >= 1 + scanDistance + scanDistance/4 && pending.count() >= (maxScanDistance - scanDistance)/2) - { - int removeCount = 0; - int extendedScanDistance = scanDistance; - int target = (maxScanDistance - scanDistance)/2; - for (int i = 0 ; i < pending.count() ; ++i) - { - Tenured t = pending.get(i); - if (t.index < 0) - continue; - - int distance = Math.min(lastIndex, t.lastIndex) - t.index; - if (distance <= scanDistance) - continue; // already scanned or untenured - - if (distance <= maxScanDistance) - { - ++removeCount; - extendedScanDistance = Math.max(extendedScanDistance, distance); - if (extendedScanDistance == maxScanDistance && removeCount >= target) - break; - } - } - - // TODO (low priority, efficiency): should perhaps also gate this decision on the span we're covering - // algorithmically, however, so long as we are under maxScanDistance we are fine - if (removeCount >= (extendedScanDistance - scanDistance)/2) - scanDistance = extendedScanDistance; - } - return scanDistance; - } - - int writeList(PendingCheckpoint pending) - { - int startIndex = listCount; - for (int i = pending.count() - 1 ; i >= 0 ; --i) - { - Tenured t = pending.get(i); - if (t.index >= 0) - { - lists[listCount++] = t.index; - } - else - { - int index = t.index & ~BIT31; - int length = t.linkLength & ~BIT31; - if (length <= 0xff && index <= 0xfffff) - { - lists[listCount++] = BIT31 | BIT30 | (length << 20) | index; - } - else if (t.linkLength >= 0 && length < BIT30) - { - lists[listCount++] = BIT31 | BIT29 | index; - } - else - { - lists[listCount++] = BIT31 | length; - lists[listCount++] = BIT31 | pending.count(); - } - } - } - return startIndex; - } - - void writeHeader(int scanDistance, int lowerBound) - { - int headerScanDistance = Math.min(scanDistance, MAX_SCAN_DISTANCE); - - if ((checkpointCount & 3) == 0) - headers[headerPointer++] = headerScanDistance; - else - headers[headerPointer - (1 + (checkpointCount & 3))] |= headerScanDistance << (8 * (checkpointCount & 3)); - - bounds[checkpointCount++] = lowerBound; - headers[headerPointer++] = listCount; - - if (scanDistance >= MAX_SCAN_DISTANCE) - lists[listCount++] = -scanDistance; // serialize as a negative value so we ignore it in most cases automatically - } - - void closeHeaders() - { - // write our final checkpoint header - if ((checkpointCount & 3) == 0) headers[headerPointer++] = 0; - headers[headerPointer++] = listCount; - } - - void ensureCapacity(int maxPendingSize) - { - if (listCount + maxPendingSize >= lists.length) - lists = cachedInts().resize(lists, listCount, lists.length + lists.length/2 + maxPendingSize); - } - - static class Scan - { - /** the scan distance we are aiming for; should be proportional to log2(N) */ - int goal; - - /** the indexes at which we increased the scan distance, and the new scan distance */ - int[] distances = new int[16]; - /** the number of unique scan distances we have adopted since the last checkpoint */ - int count; - /** the highest scan distance we have adopted (==scanDistance(scanDistanceCount-1)) */ - int watermark; - /** - * the first index we have tenured a range from, but for which we did not immediately write a new checkpoint - * we *must* scan at least from the last index in the checkpoint to here - */ - int scanTenuredAtIndex = -1; - - /** The maximum (i.e. initial) scan distance limit we have used since the last attempted checkpoint write */ - int peakMax; - - void init(int goalScanDistance) - { - goal = peakMax = goalScanDistance; - } - - private void update(RoutingKey end, int atIndex, Range[] ranges, int scanLimit, SearchableRangeListBuilder checkpoint) - { - int newScanDistance = find(end, atIndex, ranges, scanLimit, watermark); - updateScanDistance(atIndex, newScanDistance, checkpoint); - } - - private void updateScanDistance(int atIndex, int newScanDistance, SearchableRangeListBuilder checkpoint) - { - if (newScanDistance > watermark) - { - // TODO (desired, efficiency): we don't mind slight increases to the watermark; - // should really look at scan distance history and ensure we haven't e.g. doubled since - // some earlier point (and should track the match count + scan distance at each bump - // to check overall work hasn't increased too much) - if (checkpoint != null && checkpoint.canWriteCheckpoint(atIndex)) - checkpoint.writeCheckpoint(atIndex); - - watermark = newScanDistance; - if (count * 2 == distances.length) - distances = Arrays.copyOf(distances, distances.length * 2); - distances[count * 2] = newScanDistance; - distances[count * 2 + 1] = atIndex; - ++count; - } - } - - private int find(RoutingKey end, int atIndex, Range[] ranges, int scanLimit, int currentScanDistance) - { - int lowerIndex = SortedArrays.exponentialSearch(ranges, atIndex + currentScanDistance, scanLimit, end, (e, s) -> e.compareTo(s.start()), CEIL); - if (lowerIndex < 0) lowerIndex = -2 - lowerIndex; - else lowerIndex -= 1; - return lowerIndex - atIndex; - } - - boolean isAboveGoal() - { - return watermark > goal; - } - - int watermark() - { - return watermark; - } - - int goal() - { - return goal; - } - - int distanceToTenured(int lastIndex) - { - return scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0; - } - - boolean mustCheckpointToScanTenured(int checkpointIndex, int maxScanDistance) - { - return scanTenuredAtIndex >= 0 && checkpointIndex - scanTenuredAtIndex >= maxScanDistance; - } - - /** - * Are we scanning a much longer distance than the minimum number of matches we know a query will return? - * Note: with Strategy.FAST, {@code tenured.count()} gets less accurate as scan distance increases, so this - * will bounce around triggering checkpoints due to the larger scan distance, resetting the scan distance - * and starting again - */ - boolean hasMaybeDivergedFromMatchSize(TenuredSet tenured) - { - return isAboveGoal() && tenured.count() < watermark()/2; - } - - private int distance(int i) - { - return distances[i*2]; - } - - private int index(int i) - { - return distances[i*2+1]; - } - - int finalise(int lastIndex) - { - Invariants.checkState(distanceToTenured(lastIndex) <= Math.max(watermark(), peakMax())); - - int scanDistance = watermark; - // then, compute the minimum scan distance implied by any tenured ranges we did not immediately - // write a checkpoint for - we *must* scan back as far as this record - int minScanDistance = scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0; - if (minScanDistance > scanDistance) - { - // if this minimum is larger than the largest scan distance we picked up for non-tenured ranges - // then we are done, as there's nothing we can save - scanDistance = minScanDistance; - } - else if (scanDistance > 0) - { - // otherwise, we can look to see if any of the scan distances we computed overflow the checkpoint, - // i.e. where no records served by this checkpoint need to scan the full distance to reach it - int distanceToLastScanIndex = lastIndex - index(count -1); - // if the distance to the last scan index is larger than its scan distance, we have overflowed; - if (distanceToLastScanIndex < scanDistance) - { - minScanDistance = Math.max(distanceToLastScanIndex, minScanDistance); - // loop until we find one that doesn't overflow, as this is another minimum scan distance - int i = count - 1; - while (--i >= 0) - { - int distance = lastIndex - index(i); - if (distance >= distance(i)) break; - else if (distance > minScanDistance) minScanDistance = distance; - } - if (i >= 0) scanDistance = Math.max(minScanDistance, distance(i)); - else scanDistance = minScanDistance; - } - } - - return scanDistance; - } - - void reset() - { - // we could in theory reset our scan distance using the contents of scanDistance[] - // but it's a bit complicated, as we want to have the first item to increment the scan distance - // so that we can use it in writeScanDistance to shrink the scan distance; - // jumping straight to the highest scan distance breaks this - count = 0; - scanTenuredAtIndex = -1; - watermark = 0; - } - - void resetPeakMax(TenuredSet tenured) - { - peakMax = newPeakMax(tenured); - } - - int peakMax() + @Override + public boolean endInclusive(Range[] ranges) { - return peakMax; + return ranges[0].endInclusive(); } - int newPeakMax(TenuredSet tenured) + @Override + public int size(Range[] ranges) { - return Math.max(goal, tenured.count()); + return ranges.length; } - /** - * The minimum index containing a range that might need to be tenured, if we have a smaller max scan distance than before - */ - int minUntenuredIndex(int checkpointIndex, TenuredSet tenured) + @Override + public Range get(Range[] ranges, int index) { - int minUntenuredIndex = Math.max(0, (checkpointIndex - 1) - watermark()); - // the maximum scan distance that cxould ever have been adopted for the ranges processed since last checkpoint - int oldPeakMax = peakMax; - int newMinPeakMax = newPeakMax(tenured); - if (newMinPeakMax < oldPeakMax) - { - // minimise range we unnecessarily re-tenure over - // TODO (low priority, efficiency): see if can also use to reduce range we re-scan e.g. can recycle - // scanDistances contents if we know we won't need to step back further at next checkpoint - for (int i = count - 1; i >= 0 ; --i) - { - if (index(i) <= minUntenuredIndex) - break; - if (distance(i) <= newMinPeakMax) - return i + 1 == count ? index(i) : index(i + 1) - 1; - } - } - return minUntenuredIndex; + return ranges[index]; } - /** - * Record that a range at this index has been tenured, so that we can track how far back - * we need to scan to determine how long we can defer writing a new checkpoint while still - * being able to scan it. - * - * TODO (low priority, efficiency): when a checkpoint is written, we should consider moving it - * earlier if the scan distance is increased primarily because of this index, and the tenured - * collection is otherwise unchanged (so can be written with minimal overhead) - */ - void tenured(int atIndex) + @Override + public RoutableKey start(Range[] ranges, int index) { - if (scanTenuredAtIndex < 0) - scanTenuredAtIndex = atIndex; + return ranges[index].start(); } - static int minScanIndex(int checkpointIndex, int scanDistance) + @Override + public RoutableKey start(Range range) { - return Math.max(0, (checkpointIndex - 1) - scanDistance); + return range.start(); } @Override - public String toString() + public RoutableKey end(Range[] ranges, int index) { - return "Scan{watermark=" + watermark + ", tenured=" + scanTenuredAtIndex + '}'; + return ranges[index].end(); } - } - - /** - * Record-keeping for a range we have decided is not scannable - */ - static class Tenured implements Comparable - { - /** - * The end of the tenured range covered by the contents referred to be {@link #index} - */ - RoutingKey end; - - /** - *

    - *
  • If positive, this points to {@code ranges[index]}
  • - *
  • If negative, this points to an entry in {@link #lists}; - * see {@link SearchableRangeList#checkpointLists}
  • - *
- */ - int index; - - /** - * The last index in {@link #ranges} covered by this tenured range - */ - int lastIndex; - - /** - * set when this record is serialized in a checkpoint list to either: - *
    - *
  • point to itself, in which case no action should be - * taken on removal (it is only retained for size bookkeeping); or
  • - *
  • point to the next item in the checkpoint list; the first - * such element removed triggers the clearing of the checkpoint - * list so that its entries are re-inserted in the next checkpoint
  • - *
- */ - Tenured next; - /** - * Only set for link entries, i.e. where {@code index < 0}. - *
    - *
  • if positive, the length is optional as we will terminate safely using the end bound filtering
  • - *
  • if negative, the low 31 bits must be retrieved as the length for safe iteration
  • - *
- */ - int linkLength; - - Tenured(RoutingKey end, int index) + @Override + public RoutableKey end(Range range) { - this.end = end; - this.index = index; + return range.end(); } @Override - public int compareTo(@Nonnull Tenured that) + public Comparator keyComparator() { - int c = this.end.compareTo(that.end); - // we sort indexes in reverse order so later tenured items find the earlier ones with same end when searching - // for higher entries for the range of indexes to search, and - if (c == 0) c = -Integer.compare(this.index, that.index); - return c; + return Comparator.naturalOrder(); } @Override - public String toString() + public int binarySearch(Range[] ranges, int from, int to, RoutableKey find, AsymmetricComparator comparator, SortedArrays.Search op) { - return "Tenured{end=" + end + ", index=" + index + '}'; + return SortedArrays.binarySearch(ranges, from, to, find, comparator, op); } - } + }; - /** - * The set of ranges that we intend to write to checkpoints that remain open at the current point in the iteration - * This collection may be filtered before serialization, but every member will be visited either by scanning - * or visiting the checkpoint list - * TODO (low priority, efficiency): save garbage by using an insertion-sorted array for collections where - * this is sufficient. later, introduce a mutable b-tree supporting object recycling. we would also like - * to use a collection that permits us to insert and return a finger into the tree so we can find the - * successor as part of insertion, and that permits constant-time first() calls - */ - static class TenuredSet extends TreeSet + public SearchableRangeListBuilder(Range[] ranges, Strategy strategy, Links links) { - /** - * the number of direct tenured entries (i.e. ignoring link entries) - * this is used to provide a minimum bound on the number of results a range query can return - * note: with Strategy.FAST this gets less accurate as the span distance increases - */ - int directCount; - int directCountAtPrevCheckpoint; - int minSpan; - - // a stack of recently used EndAndIndex objects - used only for the duration of a single build - Tenured reuse, pendingReuse, pendingReuseTail; - - int count() - { - return directCount; - } - - int countAtPrevCheckpoint() - { - return directCountAtPrevCheckpoint; - } - - /** - * We require a checkpoint to cover a distance at least as large as the number of tenured ranges leftover - * since the prior checkpoint, to ensure these require at most linear additional space, while not requiring - * more than O(k) additional complexity on search (i.e., we will scan a number of elements at most equal - * to the number we have to visit in the checkpoint). - * - * We achieve this by recording the minimum number of match results as of the prior checkpoint (i.e. {@link #count()}) - * and discounting it by one each time we untenure a range, so that for each tenured range from the prior checkpoint - * we have either untenured a range or processed at least one additional input. - */ - int minimumSpan() - { - return minSpan; - } - - private int tenure(RoutingKey end, int index, Range[] ranges, int minUntenureIndex) - { - return tenure(newTenured(end, index), ranges, minUntenureIndex, ranges.length); - } - - private void tenure(RoutingKey end, int index, Range[] ranges, int minUntenureIndex, int untenureLimit) - { - tenure(newTenured(end, index), ranges, minUntenureIndex, untenureLimit); - } - - private int tenure(Tenured tenure, Range[] ranges, int untenureMinIndex, int untenureLimit) - { - if (!add(tenure)) - return tenure.lastIndex; - - Tenured next = higher(tenure); - if (next != null) - untenureLimit = Math.min(untenureLimit, next.lastIndex + 1); - int untenureIndex = SortedArrays.binarySearch(ranges, untenureMinIndex, untenureLimit, tenure.end, (e, s) -> e.compareTo(s.start()), CEIL); - if (untenureIndex < 0) untenureIndex = -1 - untenureIndex; - tenure.lastIndex = untenureIndex - 1; - Invariants.checkState(tenure.end.compareTo(ranges[tenure.lastIndex].start()) > 0); - Invariants.checkState(tenure.lastIndex + 1 == ranges.length || tenure.end.compareTo(ranges[tenure.lastIndex + 1].start()) <= 0); - ++directCount; - return untenureIndex - 1; - } - - private Tenured newTenured(RoutingKey end, int index) - { - Tenured result = reuse; - if (result == null) - return new Tenured(end, index); - - reuse = result.next; - result.end = end; - result.index = index; - result.lastIndex = 0; - result.next = null; - return result; - } - - private Tenured addLinkEntry(RoutingKey end, int index, int lastIndex, int length) - { - Invariants.checkArgument(index < 0); - Tenured result = newTenured(end, index); - result.linkLength = length; - result.lastIndex = lastIndex; - add(result); - return result; - } - - /** - * Retire any active tenured ranges that no longer cover the pointer into ranges; - * if this crosses our checkpoint threshold, write a new checkpoint. - */ - void untenure(int index) - { - while (!isEmpty() && first().lastIndex < index) - { - Tenured removed = pollFirst(); - - // if removed.next == null, this is not referenced by a link - // if removed.next == removed, it is referenced by a link but does not modify the link on removal - if (removed.next != null && removed.next != removed) - { - // this is a member of a link's chain, which may serve one of two purposes: - // 1) it may be the entry nominated to invalidate the link, due to the link - // membership shrinking below the required threshold; in which case we - // must clear the chain to reactivate its members for insertion into the - // next checkpoint, and remove the chain link itself - // 2) it may be nominated as an entry to update the chain link info, to make - // it more succinct: if every entry of the chain remains active, and there - // are *many* entries then we need two integers to represent the chain, but - // as soon as any entry is invalid we can rely on this entry to terminate - // iteration, so we update the bookkeeping on the first entry we remove in - // this case - - // first clear the chain starting at the removed entry - Tenured prev = removed, next = removed.next; - while (next.next != null) - { - prev = next; - next = next.next; - prev.next = null; - } - Invariants.checkState(next.index < 0); - if (prev.end == next.end) - { - // if this is the last entry in the link, the link is expired and should be removed/reused - remove(next); - if (pendingReuseTail == null) - pendingReuseTail = next; - next.next = pendingReuse; - pendingReuse = next; - } - else if (next.linkLength < 0) - { - // otherwise, flag the link as safely consumed without knowing the length - next.linkLength = next.linkLength & Integer.MAX_VALUE; - } - } - - // this was not a link reference; update our bookkeeping and save it for reuse - Invariants.checkState(removed.index >= 0); - --directCount; - --minSpan; - if (pendingReuseTail == null) - pendingReuseTail = removed; - removed.next = pendingReuse; - pendingReuse = removed; - } - } - - /** - * Write out any direct entries that are not pointed to by a chain entry, and any chain entries; - * rollover any per-checkpoint data and free up for reuse discarded Tenured objects - */ - void rollover(PendingCheckpoint pending) - { - for (Tenured tenured : this) - { - if (tenured.next == null) - pending.add(tenured); - } - // make freed Tenured objects available for reuse - if (pendingReuse != null) - { - pendingReuseTail.next = reuse; - reuse = pendingReuse; - pendingReuseTail = pendingReuse = null; - } - directCountAtPrevCheckpoint = minSpan = directCount; - } + super(RANGE_ACCESSOR, ranges, strategy, links); } - /** - * we write checkpoints out before knowing the scan distance needed for the range, as a checkpoint precedes - * the ranges it covers; so we record the position and contents of the checkpoint, and once the scan distance is - * known (i.e. when the next checkpoint is written) we re-process the list to remove items we can now scan before - * serializing to checkpointListsBuf. - */ - static class PendingCheckpoint + public SearchableRangeListBuilder(Range[] ranges, int goalScanDistance, Strategy strategy, Links links) { - int atIndex = -1; - int count; - - Tenured[] contents = new Tenured[10]; - - int openDirectCount, firstOpenDirect, openIndirectCount; - boolean hasClosedDirect; - - int count() - { - return count; - } - - Tenured get(int i) - { - return contents[i]; - } - - void add(Tenured tenured) - { - if (contents.length == count) - contents = Arrays.copyOf(contents, 2 * contents.length); - contents[count++] = tenured; - } - - void clear() - { - count = 0; - } - - /** - * Remove pending entries that will be scanned by the scanDistance, and update - * our bookkeeping for creating links - */ - int filter(int scanDistance, int lastIndex) - { - int matchCountModifier = 0; - int maxi = count; - count = 0; - openDirectCount = 0; - openIndirectCount = 0; - firstOpenDirect = -1; -// lastClosedDirect = -1; - - for (int i = 0; i < maxi ; ++i) - { - Tenured t = get(i); - if (t.index >= 0) - { - if (t.index + scanDistance >= lastIndex) - continue; // last index will find it with a scan - - if (t.lastIndex <= t.index + scanDistance) - continue; // all indexes will find it with a scan - - if (t.lastIndex > lastIndex) - { - // this range remains open for the next checkpoint; - // we may want to reference this list from there - // so track count and position of first one to make a determination - ++openDirectCount; - if (firstOpenDirect < 0) firstOpenDirect = count; - } - else hasClosedDirect = true; - } - else - { - // note: we over count here, as we count pointers within the chain - matchCountModifier += (t.linkLength & Integer.MAX_VALUE) - 1; // (subtract 1 to discount the pointer) - if (t.lastIndex > lastIndex) - ++openIndirectCount; - } - - if (i == count) ++count; - else contents[count++] = t; - } - - return count + matchCountModifier; - } - - /** - * Setup a link for referencing this chain later, if permitted. - * Must have at least two items, and at least as many direct records as indirect - */ - void setupLinkChain(TenuredSet tenured, int startIndex, int endIndex) - { - int minSizeToReference = openIndirectCount + MIN_INDIRECT_LINK_LENGTH; - if (openDirectCount >= minSizeToReference) - { - int i = firstOpenDirect; - Tenured prev = get(i++); - - while (openDirectCount > minSizeToReference) - { - Tenured e = get(i++); - if (e.index < 0) - { - --minSizeToReference; - continue; - } - - Invariants.checkState(prev.next == null); - prev.next = prev; - prev = e; - --openDirectCount; - } - - while (i < count) - { - Tenured next = get(i++); - if (next.index < 0) - continue; - - Invariants.checkState(prev.next == null); - prev.next = next; - prev = next; - } - - // may be more than one entry per item (though usually not) - int length = endIndex - startIndex; - Tenured chainEntry = tenured.addLinkEntry(prev.end, BIT31 | startIndex, prev.lastIndex, length); - prev.next = chainEntry; - if (hasClosedDirect && (startIndex > 0xfffff || (length > 0xff))) - { - // TODO (expected, testing): make sure this is tested, as not a common code path (may never be executed in normal operation) - // we have no closed ranges so iteration needs to know the end bound, but we cannot encode our bounds cheaply - // so link the first bound to the chain entry, so that on removal it triggers an update of endIndex to note - // that it can be iterated safely without an end bound - get(firstOpenDirect).next = chainEntry; - } - } - } + super(RANGE_ACCESSOR, ranges, goalScanDistance, strategy, links); + Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE); + } - @Override - public String toString() - { - return Arrays.stream(contents, 0, count) - .map(Objects::toString) - .collect(Collectors.joining(",", "[", "]")); - } + @Override + public SearchableRangeList build() + { + return build((ranges, bounds, headers, lists, maxScanAndCheckpointMatches) -> + new SearchableRangeList(ranges, bounds, headers, lists, maxScanAndCheckpointMatches)); } } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java index 579bbf95ee..a18f7828cd 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java @@ -831,6 +831,18 @@ public static V getUninterruptibly(AsyncChain chain, long time, TimeUnit } } + public static V getUnchecked(AsyncChain chain) + { + try + { + return getUninterruptibly(chain); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + } + public static void awaitUninterruptibly(AsyncChain chain) { try diff --git a/accord-core/src/main/java/accord/utils/random/Picker.java b/accord-core/src/main/java/accord/utils/random/Picker.java index f6f07d75a4..272f17ef39 100644 --- a/accord-core/src/main/java/accord/utils/random/Picker.java +++ b/accord-core/src/main/java/accord/utils/random/Picker.java @@ -26,6 +26,18 @@ public class Picker { + public static float[] randomWeights(RandomSource random, int length) + { + float[] weights = new float[length - 1]; + float sum = 0; + for (int i = 0 ; i < weights.length ; ++i) + weights[i] = sum += random.nextFloat(); + sum += random.nextFloat(); + for (int i = 0 ; i < weights.length ; ++i) + weights[i] /= sum; + return weights; + } + static abstract class Weighted { final RandomSource random; @@ -33,7 +45,7 @@ static abstract class Weighted public Weighted(RandomSource random, int length) { - this(random, randomWeights(random, length)); + this(random, Picker.randomWeights(random, length)); } public Weighted(RandomSource random, float[] weights) @@ -42,17 +54,6 @@ public Weighted(RandomSource random, float[] weights) this.weights = weights; } - static float[] randomWeights(RandomSource random, int length) - { - float[] weights = new float[length - 1]; - float sum = 0; - for (int i = 0 ; i < weights.length ; ++i) - weights[i] = sum += random.nextFloat(); - sum += random.nextFloat(); - for (int i = 0 ; i < weights.length ; ++i) - weights[i] /= sum; - return weights; - } static float[] randomWeights(RandomSource random, float[] bias) { @@ -104,7 +105,7 @@ public T get() public static WeightedObjectPicker randomWeighted(RandomSource random, T[] values) { - return new WeightedObjectPicker<>(random, values, randomWeights(random, values.length)); + return new WeightedObjectPicker<>(random, values, Picker.randomWeights(random, values.length)); } public static WeightedObjectPicker randomWeighted(RandomSource random, T[] values, float[] bias) diff --git a/accord-core/src/test/java/accord/utils/Gen.java b/accord-core/src/test/java/accord/utils/Gen.java index af86340b7e..04b6e63a4d 100644 --- a/accord-core/src/test/java/accord/utils/Gen.java +++ b/accord-core/src/test/java/accord/utils/Gen.java @@ -99,6 +99,21 @@ default Stream asStream(RandomSource rs) return Stream.generate(() -> next(rs)); } + interface Int2IntMapFunction + { + int applyAsInt(RandomSource rs, int value); + } + + interface Int2LongMapFunction + { + long applyAsLong(RandomSource rs, int value); + } + + interface Long2LongMapFunction + { + long applyAsLong(RandomSource rs, long value); + } + interface IntGen extends Gen { int nextInt(RandomSource random); @@ -114,6 +129,16 @@ default IntGen mapAsInt(IntUnaryOperator fn) return r -> fn.applyAsInt(nextInt(r)); } + default IntGen mapAsInt(Int2IntMapFunction fn) + { + return r -> fn.applyAsInt(r, nextInt(r)); + } + + default LongGen mapAsLong(Int2LongMapFunction fn) + { + return r -> fn.applyAsLong(r, nextInt(r)); + } + default Gen.IntGen filterAsInt(IntPredicate fn) { return rs -> { @@ -159,6 +184,11 @@ default LongGen mapAsLong(LongUnaryOperator fn) return r -> fn.applyAsLong(nextLong(r)); } + default LongGen mapAsLong(Long2LongMapFunction fn) + { + return r -> fn.applyAsLong(r, nextLong(r)); + } + default Gen.LongGen filterAsLong(LongPredicate fn) { return rs -> { diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java index 35e4567f61..3fc632e7f2 100644 --- a/accord-core/src/test/java/accord/utils/Gens.java +++ b/accord-core/src/test/java/accord/utils/Gens.java @@ -35,8 +35,12 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; import java.util.stream.Stream; +import accord.utils.random.Picker; + public class Gens { private Gens() { } @@ -51,6 +55,11 @@ public static Gen constant(Supplier constant) return ignore -> constant.get(); } + public static Gen.IntGen pickInt(int... ts) + { + return rs -> ts[rs.nextInt(0, ts.length)]; + } + public static Gen pick(T... ts) { return pick(Arrays.asList(ts)); @@ -116,6 +125,253 @@ public static Gen.IntGen pickZipf(int[] array) }; } + public static Gen.LongGen pickZipf(long[] array) + { + if (array == null || array.length == 0) + throw new IllegalArgumentException("Empty array given"); + if (array.length == 1) + return ignore -> array[0]; + BigDecimal[] weights = new BigDecimal[array.length]; + BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.length)); + weights[0] = base; + for (int i = 1; i < array.length; i++) + weights[i] = base.divide(BigDecimal.valueOf(i + 1), RoundingMode.UP); + BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, BigDecimal::add); + + return rs -> { + BigDecimal value = BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights); + for (int i = 0; i < weights.length; i++) + { + value = value.subtract(weights[i]); + if (value.compareTo(BigDecimal.ZERO) <= 0) + return array[i]; + } + return array[array.length - 1]; + }; + } + + public static Gen pickZipf(T... array) + { + return pickZipf(Arrays.asList(array)); + } + + public static Gen pickZipf(List array) + { + if (array == null || array.isEmpty()) + throw new IllegalArgumentException("Empty array given"); + if (array.size() == 1) + return ignore -> array.get(0); + BigDecimal[] weights = new BigDecimal[array.size()]; + BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.size())); + weights[0] = base; + for (int i = 1; i < array.size(); i++) + weights[i] = base.divide(BigDecimal.valueOf(i + 1), RoundingMode.UP); + BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, BigDecimal::add); + + return rs -> { + BigDecimal value = BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights); + for (int i = 0; i < weights.length; i++) + { + value = value.subtract(weights[i]); + if (value.compareTo(BigDecimal.ZERO) <= 0) + return array.get(i); + } + return array.get(array.size() - 1); + }; + } + + public static Gen randomWeights(int[] array) + { + return rs -> { + float[] weights = Picker.randomWeights(rs, array.length); + return r -> array[index(r, weights)]; + }; + } + + public static Gen randomWeights(long[] array) + { + return rs -> { + float[] weights = Picker.randomWeights(rs, array.length); + return r -> array[index(r, weights)]; + }; + } + + public static Gen> randomWeights(T[] array) + { + return rs -> { + float[] weights = Picker.randomWeights(rs, array.length); + return r -> array[index(r, weights)]; + }; + } + + public static Gen> randomWeights(List array) + { + return rs -> { + float[] weights = Picker.randomWeights(rs, array.size()); + return r -> array.get(index(r, weights)); + }; + } + + private static int index(RandomSource rs, float[] weights) + { + int i = Arrays.binarySearch(weights, rs.nextFloat()); + if (i < 0) i = -1 - i; + return i; + } + + public static Gen mixedDistribution(int minInclusive, int maxExclusive) + { + int domainSize = (maxExclusive - minInclusive + 1); + if (domainSize < 0) + throw new IllegalArgumentException("Range is too large; min=" + minInclusive + ", max=" + maxExclusive); + int[] array, indexes; + if (domainSize > 200) // randomly selected + { + int numBuckets = 10; + int delta = domainSize / numBuckets; + array = new int[numBuckets]; + for (int i = 0; i < numBuckets; i++) + array[i] = minInclusive + i * delta; + indexes = IntStream.range(0, array.length).toArray(); + } + else + { + array = IntStream.range(minInclusive, maxExclusive).toArray(); + indexes = null; + } + return rs -> { + switch (rs.nextInt(0, 4)) + { + case 0: // uniform + return r -> r.nextInt(minInclusive, maxExclusive); + case 1: // median biased + int median = rs.nextInt(minInclusive, maxExclusive); + return r -> r.nextBiasedInt(minInclusive, median, maxExclusive); + case 2: // zipf + if (indexes == null) + return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(array) : array); + return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(indexes) : indexes).mapAsInt((r, index) -> { + int start = array[index]; + int end = index == array.length - 1 ? maxExclusive : array[index + 1]; + return r.nextInt(start, end); + }); + case 3: // random weight + if (indexes == null) + return randomWeights(array).next(rs); + return randomWeights(indexes).next(rs).mapAsInt((r, index) -> { + int start = array[index]; + int end = index == array.length - 1 ? maxExclusive : array[index + 1]; + return r.nextInt(start, end); + }); + default: + throw new AssertionError(); + } + }; + } + + private static int[] reverseAndCopy(int[] array) + { + array = Arrays.copyOf(array, array.length); + for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid; i++, j--) + { + int tmp = array[i]; + array[i] = array[j]; + array[j] = tmp; + } + return array; + } + + public static Gen mixedDistribution(long minInclusive, long maxExclusive) + { + long domainSize = (maxExclusive - minInclusive + 1); + if (domainSize < 0) + throw new IllegalArgumentException("Range is too large; min=" + minInclusive + ", max=" + maxExclusive); + long[] array; + int[] indexes; + if (domainSize > 200) // randomly selected + { + int numBuckets = 10; + long delta = domainSize / numBuckets; + array = new long[numBuckets]; + for (int i = 0; i < numBuckets; i++) + array[i] = minInclusive + i * delta; + indexes = IntStream.range(0, array.length).toArray(); + } + else + { + array = LongStream.range(minInclusive, maxExclusive).toArray(); + indexes = null; + } + return rs -> { + switch (rs.nextInt(0, 4)) + { + case 0: // uniform + return r -> r.nextLong(minInclusive, maxExclusive); + case 1: // median biased + long median = rs.nextLong(minInclusive, maxExclusive); + return r -> r.nextBiasedLong(minInclusive, median, maxExclusive); + case 2: // zipf + if (indexes == null) + return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(array) : array); + return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(indexes) : indexes).mapAsLong((r, index) -> { + long start = array[index]; + long end = index == array.length - 1 ? maxExclusive : array[index + 1]; + return r.nextLong(start, end); + }); + case 3: // random weight + if (indexes == null) + return randomWeights(array).next(rs); + return randomWeights(indexes).next(rs).mapAsLong((r, index) -> { + long start = array[index]; + long end = index == array.length - 1 ? maxExclusive : array[index + 1]; + return r.nextLong(start, end); + }); + default: + throw new AssertionError(); + } + }; + } + + private static long[] reverseAndCopy(long[] array) + { + array = Arrays.copyOf(array, array.length); + for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid; i++, j--) + { + long tmp = array[i]; + array[i] = array[j]; + array[j] = tmp; + } + return array; + } + + public static Gen> mixedDistribution(T... list) + { + return mixedDistribution(Arrays.asList(list)); + } + + public static Gen> mixedDistribution(List list) + { + return rs -> { + switch (rs.nextInt(0, 3)) + { + case 0: // uniform + return r -> list.get(rs.nextInt(0, list.size())); + case 1: // zipf + List array = list; + if (rs.nextBoolean()) + { + array = new ArrayList<>(list); + Collections.reverse(array); + } + return pickZipf(array); + case 2: // random weight + return randomWeights(list).next(rs); + default: + throw new AssertionError(); + } + }; + } + public static Gen charArray(Gen.IntGen sizes, char[] domain) { return charArray(sizes, domain, (a, b) -> true); @@ -265,6 +521,11 @@ public Gen.IntGen between(int min, int max) return r -> r.nextInt(min, max); return r -> r.nextInt(min, max + 1); } + + public Gen mixedDistribution(int minInclusive, int maxExclusive) + { + return Gens.mixedDistribution(minInclusive, maxExclusive); + } } public static class LongDSL { @@ -296,6 +557,11 @@ public > Gen all(Class klass) return pick(klass.getEnumConstants()); } + public > Gen> allMixedDistribution(Class klass) + { + return mixedDistribution(klass.getEnumConstants()); + } + public > Gen allWithWeights(Class klass, int... weights) { T[] constants = klass.getEnumConstants(); diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index 9c81375818..0cee3ccdb8 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -198,7 +198,7 @@ private static String propertyError(Common input, Throwable cause, Object... { sb.append("Values:\n"); for (int i = 0; i < values.length; i++) - sb.append('\t').append(i).append(" = ").append(normalizeValue(values[i])).append('\n'); + sb.append('\t').append(i).append(" = ").append(normalizeValue(values[i])).append(": ").append(values[i] == null ? "unknown type" : values[i].getClass().getCanonicalName()).append('\n'); } return sb.toString(); } diff --git a/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java b/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java new file mode 100644 index 0000000000..d31c95c933 --- /dev/null +++ b/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.BiConsumer; + +import org.junit.jupiter.api.Test; + +import accord.impl.IntKey; +import accord.primitives.Range; +import org.assertj.core.api.Assertions; + +import static accord.utils.Property.qt; + +class SearchableRangeListTest +{ + @Test + public void fullWorld() + { + int numRanges = 1000; + List ranges = new ArrayList<>(numRanges); + for (int i = 0; i < numRanges; i++) + ranges.add(IntKey.range(i, i + 1)); + + SearchableRangeList list = SearchableRangeList.build(ranges.toArray(new Range[0])); + class Counter { int value;} + BiConsumer test = (rangeStart, rangeEnd) -> { + Counter counter = new Counter(); + list.forEach(IntKey.range(rangeStart, rangeEnd), (a, b, c, d, e) -> { + counter.value++; + }, (a, b, c, d, start, end) -> { + counter.value += (end - start + 1); + }, 0, 0, 0, 0, 0); + Assertions.assertThat(counter.value).isEqualTo(rangeEnd - rangeStart + 1); + }; + for (int i = 0; i < numRanges; i++) + test.accept(i, numRanges); + for (int i = 0; i < numRanges; i++) + test.accept(0, numRanges - i); + } + + @Test + public void random() + { + qt().check(rs -> { + int numRanges = rs.nextInt(1000, 10000); + List ranges = new ArrayList<>(numRanges); + for (int i = 0; i < numRanges; i++) + { + int start = rs.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE - 1000); + int offset = rs.nextInt(1, 1000); + ranges.add(IntKey.range(start, start + offset)); + } + ranges.sort(Comparator.comparing(Range::start)); + + SearchableRangeList list = SearchableRangeList.build(ranges.toArray(new Range[0])); + for (int i = 0; i < 1000; i++) + { + Range range; + int selection = rs.nextInt(0, 3); + switch (selection) + { + case 0: + range = rs.pick(ranges); + break; + case 1: + int rangeStart = rs.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE - 1000); + int offset = rs.nextInt(1, 1000); + range = IntKey.range(rangeStart, rangeStart + offset); + break; + case 2: + int start = rs.nextInt(0, ranges.size()); + int end = start + rs.nextInt(0, (ranges.size() - start)); + range = IntKey.range(((IntKey) ranges.get(start).start()).key, ((IntKey) ranges.get(end).end()).key); + break; + default: + throw new IllegalStateException("Unhandled value"); + } + List expected = new ArrayList<>(); + for (Range r : ranges) + { + if (range.compareIntersecting(r) == 0) + expected.add(r); + } + List actual = new ArrayList<>(expected.size()); + list.forEach(range, (a, b, c, d, idx) -> { + actual.add(list.ranges[idx]); + }, (a, b, c, d, start, end) -> { + for (int j = start; j < end; j++) + actual.add(list.ranges[j]); + }, 0, 0, 0, 0, 0); + + Assertions.assertThat(actual).isEqualTo(expected); + } + }); + } +} \ No newline at end of file diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle b/buildSrc/src/main/groovy/accord.java-conventions.gradle index f77a0c07b7..32cd21b1d4 100644 --- a/buildSrc/src/main/groovy/accord.java-conventions.gradle +++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle @@ -29,7 +29,7 @@ repositories { } compileJava { - sourceCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_11 dependsOn(':rat') } From 73a124b396fadbe886315bd06dfc3d756aad4b0f Mon Sep 17 00:00:00 2001 From: David Capwell Date: Mon, 4 Mar 2024 12:18:02 -0800 Subject: [PATCH 02/13] include median bias for list as well --- accord-core/src/test/java/accord/utils/Gens.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java index 3fc632e7f2..8bcea2541d 100644 --- a/accord-core/src/test/java/accord/utils/Gens.java +++ b/accord-core/src/test/java/accord/utils/Gens.java @@ -352,11 +352,14 @@ public static Gen> mixedDistribution(T... list) public static Gen> mixedDistribution(List list) { return rs -> { - switch (rs.nextInt(0, 3)) + switch (rs.nextInt(0, 4)) { case 0: // uniform return r -> list.get(rs.nextInt(0, list.size())); - case 1: // zipf + case 1: // median biased + int median = rs.nextInt(0, list.size()); + return r -> list.get(r.nextBiasedInt(0, median, list.size())); + case 2: // zipf List array = list; if (rs.nextBoolean()) { @@ -364,7 +367,7 @@ public static Gen> mixedDistribution(List list) Collections.reverse(array); } return pickZipf(array); - case 2: // random weight + case 3: // random weight return randomWeights(list).next(rs); default: throw new AssertionError(); From bcb4d74919fa6748fd3179de956683056f78ece4 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Mon, 4 Mar 2024 12:40:45 -0800 Subject: [PATCH 03/13] remove endInclusive as its dead code --- .../java/accord/utils/CheckpointIntervalArrayBuilder.java | 1 - .../main/java/accord/utils/SearchableRangeListBuilder.java | 6 ------ 2 files changed, 7 deletions(-) diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java index b82c6fda3a..95e1d86e5a 100644 --- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java +++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java @@ -69,7 +69,6 @@ public enum Links public interface Accessor { - boolean endInclusive(Ranges ranges); int size(Ranges ranges); Range get(Ranges ranges, int index); RoutingKey start(Ranges ranges, int index); diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java index f25c7867fb..546ee16a8d 100644 --- a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java +++ b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java @@ -32,12 +32,6 @@ public class SearchableRangeListBuilder extends CheckpointIntervalArrayBuilder RANGE_ACCESSOR = new Accessor<>() { - @Override - public boolean endInclusive(Range[] ranges) - { - return ranges[0].endInclusive(); - } - @Override public int size(Range[] ranges) { From 7b553ef138859e0e244673ef55be27c144d8b4fd Mon Sep 17 00:00:00 2001 From: David Capwell Date: Mon, 4 Mar 2024 13:43:45 -0800 Subject: [PATCH 04/13] added mixedDistribution for boolean as this was in a test --- .../src/test/java/accord/utils/Gens.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java index 8bcea2541d..ca9c22de75 100644 --- a/accord-core/src/test/java/accord/utils/Gens.java +++ b/accord-core/src/test/java/accord/utils/Gens.java @@ -499,6 +499,28 @@ public Boolean next(RandomSource rs) } }; } + + public Gen> mixedDistribution() + { + return rs -> { + int selection = rs.nextInt(0, 4); + switch (selection) + { + case 0: // uniform 50/50 + return r -> r.nextBoolean(); + case 1: // variable frequency + var freq = rs.nextFloat(); + return r -> r.decide(freq); + case 2: // fixed result + boolean result = rs.nextBoolean(); + return ignore -> result; + case 3: // biased repeating runs + return biasedRepeatingRuns(rs.nextDouble(), rs.nextInt(1, 100)); + default: + throw new IllegalStateException("Unexpected int for bool selection: " + selection); + } + }; + } } public static class IntDSL From f611097dc1206af0c99807ebd17f60bf1bb4d9e2 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Thu, 7 Mar 2024 22:01:32 -0800 Subject: [PATCH 05/13] lower the pick requirements on set to super --- accord-core/src/main/java/accord/utils/RandomSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java b/accord-core/src/main/java/accord/utils/RandomSource.java index 067b872ca0..f14c7a14b2 100644 --- a/accord-core/src/main/java/accord/utils/RandomSource.java +++ b/accord-core/src/main/java/accord/utils/RandomSource.java @@ -286,7 +286,7 @@ default long pickLong(long[] array, int offset, int length) return array[nextInt(offset, offset + length)]; } - default > T pick(Set set) + default > T pick(Set set) { List values = new ArrayList<>(set); // Non-ordered sets may have different iteration order on different environments, which would make a seed produce different histories! From 3e6f65af4c2559ba4a5f50d3e7b40fcba82ca3c3 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Thu, 7 Mar 2024 22:01:51 -0800 Subject: [PATCH 06/13] added a new stateful testing api --- .../src/test/java/accord/utils/Gens.java | 10 ++ .../src/test/java/accord/utils/Property.java | 115 +++++++++++++++++- 2 files changed, 124 insertions(+), 1 deletion(-) diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java index ca9c22de75..e6a26bea7c 100644 --- a/accord-core/src/test/java/accord/utils/Gens.java +++ b/accord-core/src/test/java/accord/utils/Gens.java @@ -55,6 +55,16 @@ public static Gen constant(Supplier constant) return ignore -> constant.get(); } + public static Gen oneOf(Gen... gens) + { + return oneOf(Arrays.asList(gens)); + } + + public static Gen oneOf(List> gens) + { + return rs -> rs.pick(gens).next(rs); + } + public static Gen.IntGen pickInt(int... ts) { return rs -> ts[rs.nextInt(0, ts.length)]; diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index 0cee3ccdb8..4b046f47c6 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -23,12 +23,16 @@ import accord.utils.async.AsyncResults; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import javax.annotation.Nullable; public class Property @@ -168,7 +172,10 @@ private static Object normalizeValue(Object value) } try { - return value.toString(); + String result = value.toString(); + if (result != null && result.length() > 100 && value instanceof Collection) + result = ((Collection) value).stream().map(o -> "\n\t " + o).collect(Collectors.joining(",", "[", "]")); + return result; } catch (Throwable t) { @@ -380,4 +387,110 @@ public static ForBuilder qt() { return new ForBuilder(); } + + public static StatefulBuilder stateful() + { + return new StatefulBuilder(); + } + + public static class StatefulBuilder extends Common + { + protected int steps = 100; + + public StatefulBuilder() + { + examples = 100; + } + + @SuppressWarnings("rawtypes") + public void check(Commands commands) + { + RandomSource rs = new DefaultRandom(seed); + for (int i = 0; i < examples; i++) + { + State state = null; + List history = new ArrayList<>(); + try + { + checkInterrupted(); + + state = commands.genInitialState().next(rs); + SystemUnderTest sut = commands.createSut(state); + + try + { + for (int j = 0; j < steps; j++) + { + Gen> cmdGen = commands.commands(state); + Command cmd = cmdGen.next(rs); + for (int a = 0; cmd.checkPreconditions(state) != PreCheckResult.Ok && a < 42; a++) + { + if (a == 41) + throw new IllegalArgumentException("Unable to find next command"); + cmd = cmdGen.next(rs); + } + history.add(cmd.detailed(state)); + Object stateResult = cmd.apply(state); + cmd.checkPostconditions(state, stateResult, + sut, cmd.run(sut)); + } + } + finally + { + commands.destroySut(sut); + commands.destroyState(state); + } + } + catch (Throwable t) + { + throw new PropertyError(propertyError(this, t, state, history), t); + } + if (pure) + { + seed = rs.nextLong(); + rs.setSeed(seed); + } + } + } + } + + public enum PreCheckResult { Ok, Ignore } + public interface Command + { + default PreCheckResult checkPreconditions(State state) {return PreCheckResult.Ok;} + Result apply(State state); + Result run(SystemUnderTest sut); + default void checkPostconditions(State state, Result expected, + SystemUnderTest sut, Result actual) {} + default String detailed(State state) {return this.toString();} + } + + public interface UnitCommand extends Command + { + void applyUnit(State state); + void runUnit(SystemUnderTest sut); + + @Override + default Void apply(State state) + { + applyUnit(state); + return null; + } + + @Override + default Void run(SystemUnderTest sut) + { + runUnit(sut); + return null; + } + } + + public interface Commands + { + Gen genInitialState(); + SystemUnderTest createSut(State state); + default void destroyState(State state) {} + default void destroySut(SystemUnderTest sut) {} + Gen> commands(State state); + } } From 48b2e6083c1d58d067de06e0312217befc9aec7d Mon Sep 17 00:00:00 2001 From: David Capwell Date: Thu, 7 Mar 2024 23:38:50 -0800 Subject: [PATCH 07/13] support NavigableSet picking --- accord-core/src/main/java/accord/utils/RandomSource.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java b/accord-core/src/main/java/accord/utils/RandomSource.java index f14c7a14b2..ed971aac0d 100644 --- a/accord-core/src/main/java/accord/utils/RandomSource.java +++ b/accord-core/src/main/java/accord/utils/RandomSource.java @@ -29,6 +29,8 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; +import com.google.common.collect.Iterables; + import accord.utils.random.Picker; // TODO (expected): merge with C* RandomSource @@ -286,6 +288,12 @@ default long pickLong(long[] array, int offset, int length) return array[nextInt(offset, offset + length)]; } + default T pick(NavigableSet set) + { + int offset = nextInt(0, set.size()); + return Iterables.get(set, offset); + } + default > T pick(Set set) { List values = new ArrayList<>(set); From 91ea7de225a31239c2cdf53d295f8c67d41e27a9 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Fri, 8 Mar 2024 09:33:47 -0800 Subject: [PATCH 08/13] change stateful defaults and added oneOf with weight --- accord-core/src/test/java/accord/utils/Gens.java | 6 ++++++ accord-core/src/test/java/accord/utils/Property.java | 10 ++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java index e6a26bea7c..3723fc60b6 100644 --- a/accord-core/src/test/java/accord/utils/Gens.java +++ b/accord-core/src/test/java/accord/utils/Gens.java @@ -65,6 +65,12 @@ public static Gen oneOf(List> gens) return rs -> rs.pick(gens).next(rs); } + public static Gen oneOf(Map, Integer> values) + { + Gen> gen = pick(values); + return rs -> gen.next(rs).next(rs); + } + public static Gen.IntGen pickInt(int... ts) { return rs -> ts[rs.nextInt(0, ts.length)]; diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index 4b046f47c6..499cbf4c3b 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -395,11 +395,17 @@ public static StatefulBuilder stateful() public static class StatefulBuilder extends Common { - protected int steps = 100; + protected int steps = 1000; public StatefulBuilder() { - examples = 100; + examples = 500; + } + + public StatefulBuilder withSteps(int steps) + { + this.steps = steps; + return this; } @SuppressWarnings("rawtypes") From c976cb7d455ff9ccf70adcefcbc8ea39148e9a97 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Tue, 12 Mar 2024 14:06:36 -0700 Subject: [PATCH 09/13] allocate the list upfront rather than incremental --- accord-core/src/test/java/accord/utils/Property.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index 499cbf4c3b..6733e06fc1 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -415,7 +415,7 @@ public void check(Commands comm for (int i = 0; i < examples; i++) { State state = null; - List history = new ArrayList<>(); + List history = new ArrayList<>(steps); try { checkInterrupted(); From ed9b708088121a53836275c1f4cb62728e63a696 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Wed, 13 Mar 2024 13:29:05 -0700 Subject: [PATCH 10/13] everything can now throw --- .../src/test/java/accord/utils/Property.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index 6733e06fc1..ef93e5da81 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -464,27 +464,27 @@ public enum PreCheckResult { Ok, Ignore } public interface Command { default PreCheckResult checkPreconditions(State state) {return PreCheckResult.Ok;} - Result apply(State state); - Result run(SystemUnderTest sut); + Result apply(State state) throws Throwable; + Result run(SystemUnderTest sut) throws Throwable; default void checkPostconditions(State state, Result expected, - SystemUnderTest sut, Result actual) {} + SystemUnderTest sut, Result actual) throws Throwable {} default String detailed(State state) {return this.toString();} } public interface UnitCommand extends Command { - void applyUnit(State state); - void runUnit(SystemUnderTest sut); + void applyUnit(State state) throws Throwable; + void runUnit(SystemUnderTest sut) throws Throwable; @Override - default Void apply(State state) + default Void apply(State state) throws Throwable { applyUnit(state); return null; } @Override - default Void run(SystemUnderTest sut) + default Void run(SystemUnderTest sut) throws Throwable { runUnit(sut); return null; @@ -493,10 +493,10 @@ default Void run(SystemUnderTest sut) public interface Commands { - Gen genInitialState(); - SystemUnderTest createSut(State state); - default void destroyState(State state) {} - default void destroySut(SystemUnderTest sut) {} - Gen> commands(State state); + Gen genInitialState() throws Throwable; + SystemUnderTest createSut(State state) throws Throwable; + default void destroyState(State state) throws Throwable {} + default void destroySut(SystemUnderTest sut) throws Throwable {} + Gen> commands(State state) throws Throwable; } } From fa7541268a96ba5d08059232a76293367c649997 Mon Sep 17 00:00:00 2001 From: David Capwell Date: Thu, 14 Mar 2024 13:58:31 -0700 Subject: [PATCH 11/13] improve error msg --- .../src/test/java/accord/utils/Property.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index ef93e5da81..ab6fe5e05d 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -183,7 +183,7 @@ private static Object normalizeValue(Object value) } } - private static String propertyError(Common input, Throwable cause, Object... values) + private static StringBuilder propertyErrorCommon(Common input, Throwable cause) { StringBuilder sb = new StringBuilder(); // return "Seed=" + seed + "\nExamples=" + examples; @@ -201,6 +201,12 @@ private static String propertyError(Common input, Throwable cause, Object... msg = cause.getClass().getCanonicalName(); sb.append(msg).append('\n'); } + return sb; + } + + private static String propertyError(Common input, Throwable cause, Object... values) + { + StringBuilder sb = propertyErrorCommon(input, cause); if (values != null) { sb.append("Values:\n"); @@ -210,6 +216,18 @@ private static String propertyError(Common input, Throwable cause, Object... return sb.toString(); } + private static String statefulPropertyError(StatefulBuilder input, Throwable cause, Object state, List history) + { + StringBuilder sb = propertyErrorCommon(input, cause); + sb.append("Steps: ").append(input.steps).append('\n'); + sb.append("Values:\n"); + sb.append("\tState: ").append(state).append(": ").append(state == null ? "unknown type" : state.getClass().getCanonicalName()).append('\n'); + sb.append("\tHistory:").append('\n'); + for (var event : history) + sb.append("\t\t").append(event).append('\n'); + return sb.toString(); + } + public interface FailingConsumer { void accept(A value) throws Exception; @@ -449,7 +467,7 @@ public void check(Commands comm } catch (Throwable t) { - throw new PropertyError(propertyError(this, t, state, history), t); + throw new PropertyError(statefulPropertyError(this, t, state, history), t); } if (pure) { From 154efdccaf68adfee2708bf7fa1f7e80f0d363eb Mon Sep 17 00:00:00 2001 From: ci worker Date: Thu, 28 Mar 2024 15:43:38 -0700 Subject: [PATCH 12/13] remove redundantBefore from eq as we dont serialize that, so cache check fails --- accord-core/src/main/java/accord/local/CommandsForKey.java | 1 - 1 file changed, 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java index 0e24d675ca..9e628b924b 100644 --- a/accord-core/src/main/java/accord/local/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/CommandsForKey.java @@ -1698,7 +1698,6 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) return false; CommandsForKey that = (CommandsForKey) o; return Objects.equals(key, that.key) - && Objects.equals(redundantBefore, that.redundantBefore) && Arrays.equals(txns, that.txns); } From a5fc205034e6e580dc3b422c2284d4e59d8970bf Mon Sep 17 00:00:00 2001 From: ci worker Date: Thu, 28 Mar 2024 20:57:51 -0700 Subject: [PATCH 13/13] when adding a timeout mark the test not pur --- accord-core/src/test/java/accord/utils/Property.java | 1 + 1 file changed, 1 insertion(+) diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index ab6fe5e05d..50eba0c19e 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -79,6 +79,7 @@ public T withPure(boolean pure) public T withTimeout(Duration timeout) { this.timeout = timeout; + this.pure = false; return (T) this; }