Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public class ExponentiallyDecayingReservoir implements Reservoir {
private static final int DEFAULT_SIZE = 1028;
private static final double DEFAULT_ALPHA = 0.015;
private static final long RESCALE_THRESHOLD = TimeUnit.HOURS.toNanos(1);
private static final long DEFAULT_RESCALE_THRESHOLD = TimeUnit.HOURS.toNanos(1);

private final ConcurrentSkipListMap<Double, WeightedSample> values;
private final ReentrantReadWriteLock lock;
Expand All @@ -34,6 +34,7 @@ public class ExponentiallyDecayingReservoir implements Reservoir {
private volatile long startTime;
private final AtomicLong lastScaleTick;
private final Clock clock;
private final long rescaleThreshold;

/**
* Creates a new {@link ExponentiallyDecayingReservoir} of 1028 elements, which offers a 99.9%
Expand Down Expand Up @@ -72,6 +73,7 @@ public ExponentiallyDecayingReservoir(int size, double alpha, Clock clock) {
this.count = new AtomicLong(0);
this.startTime = currentTimeInSeconds();
this.lastScaleTick = new AtomicLong(clock.getTick());
this.rescaleThreshold = calculateRescaleThreshold(alpha);
}

@Override
Expand Down Expand Up @@ -118,7 +120,7 @@ public void update(long value, long timestamp) {
private void rescaleIfNeeded() {
final long now = clock.getTick();
final long lastScaleTickSnapshot = lastScaleTick.get();
if (now - lastScaleTickSnapshot >= RESCALE_THRESHOLD) {
if (now - lastScaleTickSnapshot >= rescaleThreshold) {
rescale(now, lastScaleTickSnapshot);
}
}
Expand Down Expand Up @@ -204,4 +206,12 @@ private void lockForRegularUsage() {
private void unlockForRegularUsage() {
lock.readLock().unlock();
}

private long calculateRescaleThreshold(double alpha) {
long upperbound = (long) Math.floor(Math.log(Double.MAX_VALUE) / alpha);
if (upperbound == 0) {
throw new IllegalStateException("Alpha value resulted in a rescale threshold of 0 seconds");
}
return upperbound >= DEFAULT_RESCALE_THRESHOLD ? DEFAULT_RESCALE_THRESHOLD : TimeUnit.SECONDS.toNanos(upperbound / 2);
}
Comment on lines +210 to +216
Comment on lines +210 to +216
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,20 @@ public void clockWrapShouldNotRescale() {
testShortPeriodShouldNotRescale(Long.MAX_VALUE - TimeUnit.MINUTES.toNanos(30));
}

@Test
public void thresholdScaleWithAlpha() {
// todo(nickbar01234): Update test for lock-free implementation after consensus
// MAX_DOUBLE = e^(0.75x) means no new samples are added after ~16minutes
final ManualClock clock = new ManualClock(0);
final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(10, 0.75, clock);
reservoir.update(1000);
clock.addSeconds(16 * 60);
reservoir.update(2000);

Snapshot snapshot = reservoir.getSnapshot();
assertThat(snapshot.size()).isEqualTo(2);
}

private void testShortPeriodShouldNotRescale(long startTimeNanos) {
final ManualClock clock = new ManualClock(startTimeNanos);
final Reservoir reservoir = reservoirFactory.create(10, 1, clock);
Expand All @@ -396,7 +410,7 @@ private void testShortPeriodShouldNotRescale(long startTimeNanos) {
// wait for 10 millis and take snapshot.
// this should not trigger a rescale. Note that the number of samples will be reduced to 0
// because scaling factor equal to zero will remove all existing entries after rescale.
clock.addSeconds(20 * 60);
clock.addSeconds(5 * 60);
Comment on lines 410 to +413
Snapshot snapshot = reservoir.getSnapshot();
assertThat(snapshot.getMax()).isEqualTo(1000);
assertThat(snapshot.getMean()).isEqualTo(1000);
Expand Down