diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index 5440afcbcf..e905670d79 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -37,8 +37,15 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - jdk: [ '8', '11' ] + jdk: [ 8, 11, 17 ] spark: [ '3.3', '3.5' ] + exclude: + - jdk: 8 + spark: '3.5' + - jdk: 11 + spark: '3.5' + - jdk: 17 + spark: '3.3' name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }} steps: - uses: actions/checkout@v3 diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/view/KeyedTableDataView.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/view/KeyedTableDataView.java index 804591b68a..2b70376d60 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/view/KeyedTableDataView.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/view/KeyedTableDataView.java @@ -38,10 +38,12 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.StructLikeSet; import java.io.IOException; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.HashMap; @@ -61,7 +63,7 @@ public class KeyedTableDataView extends AbstractTableDataView { private final StructLikeMap view; - private final List changeLog = new ArrayList<>(); + private final List changeLog = new ArrayList<>(); private final RandomRecordGenerator generator; @@ -129,11 +131,11 @@ public KeyedTableDataView( public WriteResult append(int count) throws IOException { Preconditions.checkArgument(count <= primaryUpperBound - view.size()); - List records = new ArrayList<>(); + List records = new ArrayList<>(); for (int i = 0; i < primaryUpperBound; i++) { Record record = generator.randomRecord(i); if (!view.containsKey(record)) { - records.add(new RecordWithAction(record, ChangeAction.INSERT)); + records.add(new TestRecordWithAction(record, ChangeAction.INSERT)); } if (records.size() == count) { break; @@ -144,29 +146,29 @@ public WriteResult append(int count) throws IOException { public WriteResult upsert(int count) throws IOException { List scatter = randomRecord(count); - List upsert = new ArrayList<>(); + List upsert = new ArrayList<>(); for (Record record : scatter) { - upsert.add(new RecordWithAction(record, ChangeAction.DELETE)); - upsert.add(new RecordWithAction(record, ChangeAction.INSERT)); + upsert.add(new TestRecordWithAction(record, ChangeAction.DELETE)); + upsert.add(new TestRecordWithAction(record, ChangeAction.INSERT)); } return doWrite(upsert); } public WriteResult cdc(int count) throws IOException { List scatter = randomRecord(count); - List cdc = new ArrayList<>(); + List cdc = new ArrayList<>(); for (Record record : scatter) { if (view.containsKey(record)) { if (random.nextBoolean()) { // delete - cdc.add(new RecordWithAction(view.get(record), ChangeAction.DELETE)); + cdc.add(new TestRecordWithAction(view.get(record), ChangeAction.DELETE)); } else { // update - cdc.add(new RecordWithAction(view.get(record), ChangeAction.UPDATE_BEFORE)); - cdc.add(new RecordWithAction(record, ChangeAction.UPDATE_AFTER)); + cdc.add(new TestRecordWithAction(view.get(record), ChangeAction.UPDATE_BEFORE)); + cdc.add(new TestRecordWithAction(record, ChangeAction.UPDATE_AFTER)); } } else { - cdc.add(new RecordWithAction(record, ChangeAction.DELETE)); + cdc.add(new TestRecordWithAction(record, ChangeAction.DELETE)); } } return doWrite(cdc); @@ -174,9 +176,9 @@ public WriteResult cdc(int count) throws IOException { public WriteResult onlyDelete(int count) throws IOException { List scatter = randomRecord(count); - List delete = + List delete = scatter.stream() - .map(s -> new RecordWithAction(s, ChangeAction.DELETE)) + .map(s -> new TestRecordWithAction(s, ChangeAction.DELETE)) .collect(Collectors.toList()); return doWrite(delete); } @@ -188,10 +190,10 @@ public WriteResult custom(CustomData customData) throws IOException { } public WriteResult custom(List data) throws IOException { - List records = new ArrayList<>(); + List records = new ArrayList<>(); for (PKWithAction pkWithAction : data) { records.add( - new RecordWithAction(generator.randomRecord(pkWithAction.pk), pkWithAction.action)); + new TestRecordWithAction(generator.randomRecord(pkWithAction.pk), pkWithAction.action)); } return doWrite(records); } @@ -238,9 +240,9 @@ public MatchResult match(List records) { return MatchResult.of(notInView, inViewButDuplicate, missInView); } - private WriteResult doWrite(List upsert) throws IOException { + private WriteResult doWrite(List upsert) throws IOException { writeView(upsert); - WriteResult writeResult = writeFile(upsert); + WriteResult writeResult = writeFile(new ArrayList(upsert)); upsertCommit(writeResult); return writeResult; } @@ -299,8 +301,8 @@ private void upsertCommit(WriteResult writeResult) { } } - private void writeView(List records) { - for (RecordWithAction record : records) { + private void writeView(List records) { + for (TestRecordWithAction record : records) { changeLog.add(record); ChangeAction action = record.getAction(); if (action == ChangeAction.DELETE || action == ChangeAction.UPDATE_BEFORE) { @@ -339,4 +341,26 @@ protected final boolean alreadyExists(Record record) { return view.containsKey(record); } } + + public static class TestRecordWithAction extends RecordWithAction { + public TestRecordWithAction(Record record, ChangeAction action) { + super(record, action); + } + + @Override + public T get(int pos, Class javaClass) { + Object value = get(pos); + if (value instanceof LocalDateTime && javaClass == Long.class) { + @SuppressWarnings("unchecked") + T result = (T) (Long) DateTimeUtil.microsFromTimestamp((LocalDateTime) value); + return result; + } else if (value instanceof OffsetDateTime && javaClass == Long.class) { + @SuppressWarnings("unchecked") + T result = + (T) (Long) DateTimeUtil.microsFromTimestamp(((OffsetDateTime) value).toLocalDateTime()); + return result; + } + return super.get(pos, javaClass); + } + } } diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml index c71541c3ff..17c26486fb 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml @@ -426,7 +426,6 @@ org.apache.amoro.listener.AmoroRunListener - -verbose:class diff --git a/pom.xml b/pom.xml index 8189ece914..6ea3c9972f 100644 --- a/pom.xml +++ b/pom.xml @@ -159,6 +159,27 @@ compile compile provided + + + -XX:+IgnoreUnrecognizedVMOptions + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.security.tools.keytool=ALL-UNNAMED + --add-opens=java.base/sun.security.x509=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + -Djdk.reflect.useDirectMethodHandle=false + -Dio.netty.tryReflectionSetAccessible=true @@ -1076,6 +1097,9 @@ org.apache.maven.plugins maven-surefire-plugin ${maven-surefire-plugin.version} + + ${argLine} -ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs} -verbose:class + org.apache.maven.plugins @@ -1449,9 +1473,20 @@ [11,) + 11 11 + + java17 + + 17 + + + 17 + 17 + + spark-3.3