Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.jdt.ls.vmargs": "-XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -Dsun.zip.disableMemoryMapping=true -Xmx4G -Xms100m -Xlog:disable"
}
5 changes: 2 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
<version.disruptor>3.4.4</version.disruptor>
<version.easymock>4.0.2</version.easymock>
<version.eclipse-lifecycle>1.0.0</version.eclipse-lifecycle>
<version.fast-classpath-scanner>3.1.13</version.fast-classpath-scanner>
<version.flink>1.1.4</version.flink>
<version.google-flatbuffers>23.5.26</version.google-flatbuffers>
<version.guava>33.0.0-jre</version.guava>
Expand Down Expand Up @@ -931,7 +930,7 @@
<artifactId>maven-failsafe-plugin</artifactId>
<version>${version.maven-failsafe-plugin}</version>
<configuration>
<argLine>${failsafeArgLine} -Djava.library.path=${project.build.directory}/native/META-INF/native/libnetty_tcnative_linux_x86_64.so -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -Dfile.encoding=UTF8 -Duser.timezone=GMT -Xmx1024m -Dapple.awt.UIElement=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc= -Dzookeeper.sasl.client=false</argLine>
<argLine>${failsafeArgLine} -Djava.library.path=${project.build.directory}/native/META-INF/native/libnetty_tcnative_linux_x86_64.so -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -Dfile.encoding=UTF8 -Duser.timezone=GMT -Xmx1024m -Dapple.awt.UIElement=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc= -Dzookeeper.sasl.client=false</argLine>
<includes>
<include>**/*IT.java</include>
</includes>
Expand Down Expand Up @@ -989,7 +988,7 @@
<threadCountClasses>1</threadCountClasses>
<threadCountMethods>0</threadCountMethods>
<threadCountSuites>0</threadCountSuites>
<argLine>${sureFireArgLine} -Djava.library.path=${project.build.directory}/native/META-INF/native/libnetty_tcnative_linux_x86_64.so -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -Dfile.encoding=UTF8 -Duser.timezone=GMT -Xmx1024m -Dapple.awt.UIElement=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc= -Dzookeeper.sasl.client=false</argLine>
<argLine>${sureFireArgLine} -Djava.library.path=${project.build.directory}/native/META-INF/native/libnetty_tcnative_linux_x86_64.so -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -Dfile.encoding=UTF8 -Duser.timezone=GMT -Xmx1024m -Dapple.awt.UIElement=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc= -Dzookeeper.sasl.client=false</argLine>
<runOrder>random</runOrder>
<excludedGroups>timely.test.IntegrationTest</excludedGroups>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
}

private void seekIfNecessary() {
if (super.hasTop()) {
while (super.hasTop()) {
Key top = super.getTopKey();
// If less than any configured ageoff, then we want to return this
// K,V
Expand All @@ -85,7 +85,7 @@ private void seekIfNecessary() {
String metricName = MetricAdapter.decodeRowKey(top).getFirst();
handleNewMetricName(metricName);
seekPastAgedOffMetricData(top.getRow().getBytes(), metricName, this.maxAgeOff);
return;
continue;
}
if (isNextMetricTheSame(top.getRow())) {
// this metric name is the same as previous
Expand All @@ -95,6 +95,7 @@ private void seekIfNecessary() {
String metricName = new String(prevMetricBytes.copyBytes(), UTF_8);
log.trace("Current metric is older than age off for metric {}, seeking to start of valid data", metricName);
seekPastAgedOffMetricData(top.getRow().getBytes(), metricName, prevAgeOff);
continue;
}
} else {
// Metric name is different or prev information is not set
Expand All @@ -103,8 +104,10 @@ private void seekIfNecessary() {
if (currentTime - top.getTimestamp() > prevAgeOff) {
log.trace("New metric found, but older than age off for metric {}, seeking to start of valid data", metricName);
seekPastAgedOffMetricData(top.getRow().getBytes(), metricName, prevAgeOff);
continue;
}
}
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class TimeSeriesGroupingIteratorTest extends IteratorTestBase {

private static final Logger log = LoggerFactory.getLogger(TimeSeriesGroupingIteratorTest.class);

private TreeMap<Key,Value> table = new TreeMap<>();
private TreeMap<Key, Value> table = new TreeMap<>();
private static final List<Tag> tags = new ArrayList<>();
static {
tags.add(new Tag("rack", "r1"));
Expand All @@ -35,12 +35,13 @@ public class TimeSeriesGroupingIteratorTest extends IteratorTestBase {
@Before
public void setup() {
table.clear();
long ts = ((System.currentTimeMillis() / 1000) * 1000);
long ts = 1705400000000L; // Fixed timestamp for reproducibility
for (int i = 0; i < 100; i++) {
ts += 1000;
Metric m = new Metric("sys.cpu.user", ts, i * 1.0D, tags);
byte[] row = MetricAdapter.encodeRowKey(m);
Key k = new Key(row, tags.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k = new Key(row, tags.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v = new Value(MetricAdapter.encodeValue(m.getValue().getMeasure()));
table.put(k, v);
}
Expand All @@ -56,7 +57,7 @@ public void testMovingAverage() throws Exception {
iter.seek(new Range(), EMPTY_COL_FAMS, true);

for (int i = 4; i < 100; i++) {
checkNextResult(iter, new double[] {i - 4, i - 3, i - 2, i - 1, i});
checkNextResult(iter, new double[] { i - 4, i - 3, i - 2, i - 1, i });
}
assertFalse(iter.hasTop());
}
Expand Down Expand Up @@ -89,12 +90,14 @@ public void testMultipleTimeSeriesMovingAverage() throws Exception {
ts += 1000;
Metric m = new Metric("sys.cpu.user", ts, i * 1.0D, tags1);
byte[] row = MetricAdapter.encodeRowKey(m);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v = new Value(MetricAdapter.encodeValue(m.getValue().getMeasure()));
table.put(k, v);
Metric m2 = new Metric("sys.cpu.user", ts, i * 2.0D, tags2);
byte[] row2 = MetricAdapter.encodeRowKey(m2);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v2 = new Value(MetricAdapter.encodeValue(m2.getValue().getMeasure()));
table.put(k2, v2);
}
Expand All @@ -108,10 +111,10 @@ public void testMultipleTimeSeriesMovingAverage() throws Exception {
// this section changed when the key structure changed so that identical
// colFam values sorted consecutively within an given time period
for (int i = 4; i < 100; i++) {
checkNextResult(iter, new double[] {i - 4, i - 3, i - 2, i - 1, i});
checkNextResult(iter, new double[] { i - 4, i - 3, i - 2, i - 1, i });
}
for (int i = 4; i < 100; i++) {
checkNextResult(iter, new double[] {(i - 4) * 2, (i - 3) * 2, (i - 2) * 2, (i - 1) * 2, i * 2});
checkNextResult(iter, new double[] { (i - 4) * 2, (i - 3) * 2, (i - 2) * 2, (i - 1) * 2, i * 2 });
}
assertFalse(iter.hasTop());

Expand All @@ -129,14 +132,16 @@ public void testTimeSeriesDropOff() throws Exception {
ts += 1000;
Metric m = new Metric("sys.cpu.user", ts, i * 1.0D, tags1);
byte[] row = MetricAdapter.encodeRowKey(m);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v = new Value(MetricAdapter.encodeValue(m.getValue().getMeasure()));
table.put(k, v);
if (i < 50) {
// only populate this series 50 times
Metric m2 = new Metric("sys.cpu.user", ts, i * 2.0D, tags2);
byte[] row2 = MetricAdapter.encodeRowKey(m2);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v2 = new Value(MetricAdapter.encodeValue(m2.getValue().getMeasure()));
table.put(k2, v2);
}
Expand All @@ -152,10 +157,10 @@ public void testTimeSeriesDropOff() throws Exception {
// this section changed when the key structure changed so that identical
// colFam values sorted consecutively within a given time period
for (int i = 4; i < 100; i++) {
checkNextResult(iter, new double[] {i - 4, i - 3, i - 2, i - 1, i});
checkNextResult(iter, new double[] { i - 4, i - 3, i - 2, i - 1, i });
}
for (int i = 4; i < 50; i++) {
checkNextResult(iter, new double[] {(i - 4) * 2, (i - 3) * 2, (i - 2) * 2, (i - 1) * 2, i * 2});
checkNextResult(iter, new double[] { (i - 4) * 2, (i - 3) * 2, (i - 2) * 2, (i - 1) * 2, i * 2 });
}
assertFalse(iter.hasTop());
}
Expand All @@ -172,14 +177,16 @@ public void testAdditionalTimeSeries() throws Exception {
ts += 1000;
Metric m = new Metric("sys.cpu.user", ts, i * 1.0D, tags1);
byte[] row = MetricAdapter.encodeRowKey(m);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v = new Value(MetricAdapter.encodeValue(m.getValue().getMeasure()));
table.put(k, v);
if (i > 50) {
// only populate this series 50 times
Metric m2 = new Metric("sys.cpu.user", ts, i * 2.0D, tags2);
byte[] row2 = MetricAdapter.encodeRowKey(m2);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v2 = new Value(MetricAdapter.encodeValue(m2.getValue().getMeasure()));
table.put(k2, v2);
}
Expand All @@ -194,10 +201,10 @@ public void testAdditionalTimeSeries() throws Exception {
// this section changed when the key structure changed so that identical
// colFam values sorted consecutively within an given time period
for (int i = 4; i < 100; i++) {
checkNextResult(iter, new double[] {i - 4, i - 3, i - 2, i - 1, i});
checkNextResult(iter, new double[] { i - 4, i - 3, i - 2, i - 1, i });
}
for (int i = 55; i < 100; i++) {
checkNextResult(iter, new double[] {(i - 4) * 2, (i - 3) * 2, (i - 2) * 2, (i - 1) * 2, i * 2});
checkNextResult(iter, new double[] { (i - 4) * 2, (i - 3) * 2, (i - 2) * 2, (i - 1) * 2, i * 2 });
}

assertFalse(iter.hasTop());
Expand All @@ -218,18 +225,21 @@ public void testManySparseTimeSeries() throws Exception {
ts += 1000;
Metric m = new Metric("sys.cpu.user", ts, i * 1.0D, tags1);
byte[] row = MetricAdapter.encodeRowKey(m);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v = new Value(MetricAdapter.encodeValue(m.getValue().getMeasure()));
table.put(k, v);
// jitter the time on the second time series
Metric m2 = new Metric("sys.cpu.user", ts + 50, i * 2.0D, tags2);
byte[] row2 = MetricAdapter.encodeRowKey(m2);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts + 50);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts + 50);
Value v2 = new Value(MetricAdapter.encodeValue(m2.getValue().getMeasure()));
table.put(k2, v2);
Metric m3 = new Metric("sys.cpu.user", ts, i * 3.0D, tags3);
byte[] row3 = MetricAdapter.encodeRowKey(m3);
Key k3 = new Key(row3, tags3.get(0).join().getBytes(StandardCharsets.UTF_8), MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Key k3 = new Key(row3, tags3.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v3 = new Value(MetricAdapter.encodeValue(m3.getValue().getMeasure()));
table.put(k3, v3);
}
Expand Down Expand Up @@ -286,7 +296,8 @@ private void checkNextResult(TimeSeriesGroupingIterator iter, double[] expectedV
iter.next();
}

private void checkNextResult(TimeSeriesGroupingIterator iter, LinkedList<Double> expectedValues) throws IOException {
private void checkNextResult(TimeSeriesGroupingIterator iter, LinkedList<Double> expectedValues)
throws IOException {
assertTrue(iter.hasTop());
log.trace("Expected: {}", expectedValues);
log.trace("Getting value for Key {}", iter.getTopKey());
Expand Down