From 62dde22df843a5b86d02eb0e942593f8bb1111d4 Mon Sep 17 00:00:00 2001 From: Durgaprasad M L Date: Sat, 30 May 2026 15:45:02 +0530 Subject: [PATCH 1/2] [IO] Implement DeltaIO reader and add Delta Lake perf tests --- sdks/java/io/delta/build.gradle | 9 + .../org/apache/beam/sdk/io/delta/DeltaIO.java | 391 +++++++++++++++++- .../apache/beam/sdk/io/delta/DeltaIOIT.java | 346 ++++++++++++++++ .../io/delta/DeltaIOTestPipelineOptions.java | 32 ++ 4 files changed, 775 insertions(+), 3 deletions(-) create mode 100644 sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOIT.java create mode 100644 sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTestPipelineOptions.java diff --git a/sdks/java/io/delta/build.gradle b/sdks/java/io/delta/build.gradle index 617965b3bc4e..68281a1a7087 100644 --- a/sdks/java/io/delta/build.gradle +++ b/sdks/java/io/delta/build.gradle @@ -36,4 +36,13 @@ dependencies { permitUnusedDeclared library.java.delta_kernel_defaults testImplementation library.java.junit + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation project(path: ":sdks:java:io:common") + testImplementation project(path: ":sdks:java:testing:test-utils") + testImplementation project(path: ":sdks:java:io:parquet") + testImplementation project(path: ":sdks:java:extensions:avro") + testImplementation library.java.hadoop_client + testImplementation library.java.hadoop_common + testImplementation library.java.slf4j_api } + diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java index 6c5df4728b4e..80fa34efd1cb 100644 --- a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java @@ -17,10 +17,53 @@ */ package org.apache.beam.sdk.io.delta; +import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; + import com.google.auto.value.AutoValue; +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.data.ScanStateRow; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -83,9 +126,351 @@ public ReadRows withConfig(Map config) { @Override public PCollection expand(PBegin input) { - // TODO(https://github.com/apache/beam/issues/38551): Implement expansion for - // Delta Lake ReadRows - throw new UnsupportedOperationException("Not implemented yet."); + if (getTablePath() == null) { + throw new IllegalArgumentException("Table path must be set."); + } + + org.apache.hadoop.conf.Configuration hadoopConfig = getHadoopConfiguration(getHadoopConfig()); + Engine engine = DefaultEngine.create(hadoopConfig); + Table table = Table.forPath(engine, getTablePath()); + + try { + Snapshot snapshot = buildSnapshot(engine, table); + Scan scan = snapshot.getScanBuilder(engine).build(); + StructType readSchema = scan.getSchema(engine); + org.apache.beam.sdk.schemas.Schema beamSchema = inferBeamSchema(readSchema); + + List fileDescriptors = buildFileDescriptors(engine, scan); + + return input + .apply("CreateFileDescriptors", Create.of(fileDescriptors) + .withCoder(SerializableCoder.of(DeltaFileDescriptor.class))) + .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema))) + .setRowSchema(beamSchema); + + } catch (Exception e) { + throw new RuntimeException("Failed to read Delta table: " + getTablePath(), e); + } + } + + private Snapshot buildSnapshot(Engine engine, Table table) throws Exception { + if (getVersion() != null) { + return table.getSnapshotAsOfVersion(engine, getVersion()); + } else if (getTimestamp() != null) { + long epochMillis = org.joda.time.Instant.parse(getTimestamp()).getMillis(); + return table.getSnapshotAsOfTimestamp(engine, epochMillis); + } else { + return table.getLatestSnapshot(engine); + } + } + + private List buildFileDescriptors(Engine engine, Scan scan) throws Exception { + List descriptors = new ArrayList<>(); + try (CloseableIterator scanFileIter = scan.getScanFiles(engine)) { + while (scanFileIter.hasNext()) { + FilteredColumnarBatch scanFilesBatch = scanFileIter.next(); + try (CloseableIterator scanFileRows = scanFilesBatch.getRows()) { + while (scanFileRows.hasNext()) { + io.delta.kernel.data.Row scanFileRow = scanFileRows.next(); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); + descriptors.add(new DeltaFileDescriptor( + getTablePath(), + fileStatus.getPath(), + fileStatus.getSize(), + fileStatus.getModificationTime(), + getHadoopConfig(), + getVersion(), + getTimestamp() + )); + } + } + } + } + return descriptors; + } + } + + public static class DeltaFileDescriptor implements Serializable { + private final String tablePath; + private final String filePath; + private final long fileSize; + private final long modificationTime; + private final @Nullable Map hadoopConfig; + private final @Nullable Long version; + private final @Nullable String timestamp; + + public DeltaFileDescriptor( + String tablePath, + String filePath, + long fileSize, + long modificationTime, + @Nullable Map hadoopConfig, + @Nullable Long version, + @Nullable String timestamp) { + this.tablePath = tablePath; + this.filePath = filePath; + this.fileSize = fileSize; + this.modificationTime = modificationTime; + this.hadoopConfig = hadoopConfig; + this.version = version; + this.timestamp = timestamp; + } + + public String getTablePath() { + return tablePath; + } + + public String getFilePath() { + return filePath; + } + + public long getFileSize() { + return fileSize; + } + + public long getModificationTime() { + return modificationTime; + } + + public @Nullable Map getHadoopConfig() { + return hadoopConfig; + } + + public @Nullable Long getVersion() { + return version; + } + + public @Nullable String getTimestamp() { + return timestamp; + } + } + + public static class ReadFileFn extends DoFn { + private final org.apache.beam.sdk.schemas.Schema beamSchema; + + public ReadFileFn(org.apache.beam.sdk.schemas.Schema beamSchema) { + this.beamSchema = beamSchema; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + DeltaFileDescriptor desc = c.element(); + org.apache.hadoop.conf.Configuration hadoopConfig = getHadoopConfiguration(desc.getHadoopConfig()); + Engine engine = DefaultEngine.create(hadoopConfig); + Table table = Table.forPath(engine, desc.getTablePath()); + + Snapshot snapshot; + if (desc.getVersion() != null) { + snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion()); + } else if (desc.getTimestamp() != null) { + long epochMillis = org.joda.time.Instant.parse(desc.getTimestamp()).getMillis(); + snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis); + } else { + snapshot = table.getLatestSnapshot(engine); + } + + Scan scan = snapshot.getScanBuilder(engine).build(); + io.delta.kernel.data.Row scanState = scan.getScanState(engine); + + try (CloseableIterator scanFileIter = scan.getScanFiles(engine)) { + while (scanFileIter.hasNext()) { + FilteredColumnarBatch scanFilesBatch = scanFileIter.next(); + try (CloseableIterator scanFileRows = scanFilesBatch.getRows()) { + while (scanFileRows.hasNext()) { + io.delta.kernel.data.Row scanFileRow = scanFileRows.next(); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); + if (fileStatus.getPath().equals(desc.getFilePath())) { + StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(scanState); + CloseableIterator physicalDataIter = + engine.getParquetHandler().readParquetFiles( + singletonCloseableIterator(fileStatus), + physicalReadSchema, + Optional.empty()).map(FilteredColumnarBatch::getData); + try ( + CloseableIterator transformedData = + Scan.transformPhysicalData( + engine, + scanState, + scanFileRow, + physicalDataIter)) { + while (transformedData.hasNext()) { + FilteredColumnarBatch filteredData = transformedData.next(); + try (CloseableIterator rows = filteredData.getRows()) { + while (rows.hasNext()) { + io.delta.kernel.data.Row row = rows.next(); + c.output(convertKernelRowToBeamRow(row, beamSchema)); + } + } + } + } + } + } + } + } + } + } + } + + private static org.apache.hadoop.conf.Configuration getHadoopConfiguration( + @Nullable Map configMap) { + org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); + if (configMap != null) { + for (Map.Entry entry : configMap.entrySet()) { + config.set(entry.getKey(), entry.getValue()); + } + } + return config; + } + + private static org.apache.beam.sdk.schemas.Schema inferBeamSchema(StructType structType) { + org.apache.beam.sdk.schemas.Schema.Builder builder = org.apache.beam.sdk.schemas.Schema.builder(); + for (StructField field : structType.fields()) { + builder.addField(field.getName(), toBeamFieldType(field.getDataType())); + } + return builder.build(); + } + + private static org.apache.beam.sdk.schemas.Schema.FieldType toBeamFieldType(DataType dataType) { + if (dataType instanceof IntegerType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.INT32; + } else if (dataType instanceof LongType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.INT64; + } else if (dataType instanceof StringType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.STRING; + } else if (dataType instanceof DoubleType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; + } else if (dataType instanceof FloatType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; + } else if (dataType instanceof BooleanType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; + } else if (dataType instanceof ShortType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.INT16; + } else if (dataType instanceof ByteType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.BYTE; + } else if (dataType instanceof BinaryType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.BYTES; + } else if (dataType instanceof DecimalType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.DECIMAL; + } else if (dataType instanceof TimestampType || dataType instanceof DateType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME; + } else if (dataType instanceof StructType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.row(inferBeamSchema((StructType) dataType)); + } else if (dataType instanceof ArrayType) { + return org.apache.beam.sdk.schemas.Schema.FieldType.array(toBeamFieldType(((ArrayType) dataType).getElementType())); + } else if (dataType instanceof MapType) { + MapType mapType = (MapType) dataType; + return org.apache.beam.sdk.schemas.Schema.FieldType.map( + toBeamFieldType(mapType.getKeyType()), + toBeamFieldType(mapType.getValueType())); + } else { + throw new IllegalArgumentException("Unsupported Delta type: " + dataType); + } + } + + private static Row convertKernelRowToBeamRow( + io.delta.kernel.data.Row deltaRow, org.apache.beam.sdk.schemas.Schema beamSchema) { + List values = new ArrayList<>(); + StructType structType = deltaRow.getSchema(); + for (int i = 0; i < structType.length(); i++) { + if (deltaRow.isNullAt(i)) { + values.add(null); + } else { + DataType dataType = structType.at(i).getDataType(); + values.add(convertValue(deltaRow, i, dataType)); + } + } + return Row.withSchema(beamSchema).addValues(values).build(); + } + + private static Object convertValue(io.delta.kernel.data.Row deltaRow, int ordinal, DataType dataType) { + if (deltaRow.isNullAt(ordinal)) { + return null; + } + if (dataType instanceof IntegerType) { + return deltaRow.getInt(ordinal); + } else if (dataType instanceof LongType) { + return deltaRow.getLong(ordinal); + } else if (dataType instanceof StringType) { + return deltaRow.getString(ordinal); + } else if (dataType instanceof DoubleType) { + return deltaRow.getDouble(ordinal); + } else if (dataType instanceof FloatType) { + return deltaRow.getFloat(ordinal); + } else if (dataType instanceof BooleanType) { + return deltaRow.getBoolean(ordinal); + } else if (dataType instanceof ShortType) { + return deltaRow.getShort(ordinal); + } else if (dataType instanceof ByteType) { + return deltaRow.getByte(ordinal); + } else if (dataType instanceof BinaryType) { + return deltaRow.getBinary(ordinal); + } else if (dataType instanceof DecimalType) { + return deltaRow.getDecimal(ordinal); + } else if (dataType instanceof TimestampType) { + long micros = deltaRow.getLong(ordinal); + return org.joda.time.Instant.ofEpochMilli(micros / 1000); + } else if (dataType instanceof DateType) { + int days = deltaRow.getInt(ordinal); + return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000); + } else if (dataType instanceof StructType) { + io.delta.kernel.data.Row structRow = deltaRow.getStruct(ordinal); + return convertKernelRowToBeamRow(structRow, inferBeamSchema((StructType) dataType)); + } else if (dataType instanceof ArrayType) { + ArrayValue arrayVal = deltaRow.getArray(ordinal); + int size = arrayVal.getSize(); + List list = new ArrayList<>(size); + DataType elemType = ((ArrayType) dataType).getElementType(); + ColumnVector vec = arrayVal.getElements(); + for (int j = 0; j < size; j++) { + if (vec.isNullAt(j)) { + list.add(null); + } else { + list.add(convertVectorValue(vec, j, elemType)); + } + } + return list; + } else if (dataType instanceof MapType) { + MapValue mapVal = deltaRow.getMap(ordinal); + int size = mapVal.getSize(); + Map map = new HashMap<>(size); + DataType keyType = ((MapType) dataType).getKeyType(); + DataType valueType = ((MapType) dataType).getValueType(); + ColumnVector keysVec = mapVal.getKeys(); + ColumnVector valuesVec = mapVal.getValues(); + for (int j = 0; j < size; j++) { + Object key = convertVectorValue(keysVec, j, keyType); + Object val = valuesVec.isNullAt(j) ? null : convertVectorValue(valuesVec, j, valueType); + map.put(key, val); + } + return map; + } else { + return deltaRow.toString(); + } + } + + private static Object convertVectorValue(ColumnVector vec, int rowId, DataType dataType) { + if (dataType instanceof IntegerType) { + return vec.getInt(rowId); + } else if (dataType instanceof LongType) { + return vec.getLong(rowId); + } else if (dataType instanceof StringType) { + return vec.getString(rowId); + } else if (dataType instanceof DoubleType) { + return vec.getDouble(rowId); + } else if (dataType instanceof FloatType) { + return vec.getFloat(rowId); + } else if (dataType instanceof BooleanType) { + return vec.getBoolean(rowId); + } else if (dataType instanceof ShortType) { + return vec.getShort(rowId); + } else if (dataType instanceof ByteType) { + return vec.getByte(rowId); + } else if (dataType instanceof BinaryType) { + return vec.getBinary(rowId); + } else if (dataType instanceof DecimalType) { + return vec.getDecimal(rowId); + } else { + return vec.toString(); } } } diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOIT.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOIT.java new file mode 100644 index 000000000000..b2af9ef17152 --- /dev/null +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOIT.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import static org.junit.Assert.assertNotEquals; + +import java.io.PrintWriter; +import java.io.Serializable; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.io.DefaultFilenamePolicy; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.IOITHelper; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MimeTypes; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.testutils.metrics.TimeMonitor; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Performance and Integration tests for {@link DeltaIO}. + * + *

Run this test using the command below: + * + *

+ *  ./gradlew :sdks:java:io:delta:test --tests org.apache.beam.sdk.io.delta.DeltaIOIT
+ * 
+ */ +@RunWith(JUnit4.class) +public class DeltaIOIT implements Serializable { + + private static final Schema AVRO_SCHEMA = + new Schema.Parser() + .parse( + "{\n" + + " \"namespace\": \"ioitdelta\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRowRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"}\n" + + " ]\n" + + "}"); + + private static final Schema PARTITIONED_AVRO_SCHEMA = + new Schema.Parser() + .parse( + "{\n" + + " \"namespace\": \"ioitdelta\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRowRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"part\", \"type\": \"string\"}\n" + + " ]\n" + + "}"); + + private static final String NAMESPACE = DeltaIOIT.class.getName(); + + private static String tablePathPrefix; + private static InfluxDBSettings settings; + + @Rule public transient TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public transient TestPipeline pipelineRead = TestPipeline.create(); + + @BeforeClass + public static void setup() { + DeltaIOTestPipelineOptions options = null; + try { + options = IOITHelper.readIOTestPipelineOptions(DeltaIOTestPipelineOptions.class); + } catch (IllegalArgumentException e) { + // In local environments, fall back to target directory if not provided + } + if (options != null) { + tablePathPrefix = options.getTablePath(); + settings = + InfluxDBSettings.builder() + .withHost(options.getInfluxHost()) + .withDatabase(options.getInfluxDatabase()) + .withMeasurement(options.getInfluxMeasurement()) + .get(); + } else { + tablePathPrefix = "target/temp-delta-table"; + settings = null; + } + } + + @Test + public void testReadSmall() throws Exception { + runIntegrationTest(1000, false, "small"); + } + + @Test + public void testReadLarge() throws Exception { + runIntegrationTest(100000, false, "large"); + } + + @Test + public void testReadPartitioned() throws Exception { + runIntegrationTest(1000, true, "partitioned"); + } + + private void runIntegrationTest(int numRecords, boolean isPartitioned, String scenarioName) throws Exception { + String tablePath = appendTimestampSuffix(tablePathPrefix + "-" + scenarioName); + try { + // 1. Write Parquet files using Beam + writeParquetFiles(tablePath, numRecords, isPartitioned); + + // 2. Generate Delta Log + generateDeltaLog(tablePath, isPartitioned); + + // 3. Read Delta Table and assert + readAndVerify(tablePath, numRecords, isPartitioned, scenarioName); + + } finally { + cleanUp(tablePath); + } + } + + private static String appendTimestampSuffix(String text) { + return String.format("%s_%s", text, java.time.Instant.now().toEpochMilli()); + } + + private void writeParquetFiles(String tablePath, int numRecords, boolean isPartitioned) { + if (isPartitioned) { + pipelineWrite + .apply("Generate sequence", GenerateSequence.from(0).to(numRecords)) + .apply("Construct TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply("Convert to Partitioned Avro", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + TestRow row = c.element(); + String part = (row.id() % 2 == 0) ? "even" : "odd"; + c.output(new GenericRecordBuilder(PARTITIONED_AVRO_SCHEMA) + .set("id", row.id()) + .set("name", row.name()) + .set("part", part) + .build()); + } + })) + .setCoder(AvroCoder.of(PARTITIONED_AVRO_SCHEMA)) + .apply("Write Partitioned Parquet", FileIO.writeDynamic() + .by(record -> "part=" + record.get("part")) + .via(ParquetIO.sink(PARTITIONED_AVRO_SCHEMA)) + .to(tablePath) + .withNaming(key -> DefaultFilenamePolicy.fromStandardNaming( + key + "/part", null, ".parquet", false))); + } else { + pipelineWrite + .apply("Generate sequence", GenerateSequence.from(0).to(numRecords)) + .apply("Construct TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply("Convert to Avro", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + TestRow row = c.element(); + c.output(new GenericRecordBuilder(AVRO_SCHEMA) + .set("id", row.id()) + .set("name", row.name()) + .build()); + } + })) + .setCoder(AvroCoder.of(AVRO_SCHEMA)) + .apply("Write Parquet files", FileIO.write() + .via(ParquetIO.sink(AVRO_SCHEMA)) + .to(tablePath) + .withNumShards(2)); + } + pipelineWrite.run().waitUntilFinish(); + } + + private void generateDeltaLog(String tablePath, boolean isPartitioned) throws Exception { + List metadataList = FileSystems.match(tablePath + "/**/*.parquet").metadata(); + ResourceId logFileResourceId = FileSystems.matchNewResource(tablePath + "/_delta_log/00000000000000000000.json", false); + + try (PrintWriter writer = new PrintWriter( + Channels.newWriter(FileSystems.create(logFileResourceId, MimeTypes.TEXT), "UTF-8"))) { + writer.println("{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":1}}"); + if (isPartitioned) { + writer.println("{\"metaData\":{\"id\":\"test-uuid-partitioned\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"id\\\",\\\"type\\\":\\\"integer\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"part\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionBy\":[\"part\"],\"configuration\":{},\"createdTime\":1717081200000}}"); + } else { + writer.println("{\"metaData\":{\"id\":\"test-uuid-nonpartitioned\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"id\\\",\\\"type\\\":\\\"integer\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionBy\":[],\"configuration\":{},\"createdTime\":1717081200000}}"); + } + + for (MatchResult.Metadata metadata : metadataList) { + String fullPath = metadata.resourceId().toString(); + String tableRoot = FileSystems.matchNewResource(tablePath, true).toString(); + String relativePath = fullPath.substring(tableRoot.length()); + + if (relativePath.startsWith("/")) { + relativePath = relativePath.substring(1); + } + + String addAction; + if (isPartitioned) { + String partValue = relativePath.contains("part=even") ? "even" : "odd"; + addAction = String.format( + "{\"add\":{\"path\":\"%s\",\"partitionValues\":{\"part\":\"%s\"},\"size\":%d,\"modificationTime\":%d,\"dataChange\":true}}", + relativePath, partValue, metadata.sizeBytes(), metadata.lastModifiedMillis()); + } else { + addAction = String.format( + "{\"add\":{\"path\":\"%s\",\"partitionValues\":{},\"size\":%d,\"modificationTime\":%d,\"dataChange\":true}}", + relativePath, metadata.sizeBytes(), metadata.lastModifiedMillis()); + } + writer.println(addAction); + } + } + } + + private void readAndVerify(String tablePath, int numRecords, boolean isPartitioned, String scenarioName) { + PCollection deltaRows = + pipelineRead.apply("Read from Delta", DeltaIO.readRows().from(tablePath)); + + PCollection monitoredRows = + deltaRows.apply("TimeMonitor", ParDo.of(new TimeMonitor<>(NAMESPACE, scenarioName + "_read"))); + + PCollection namesAndIds; + if (isPartitioned) { + namesAndIds = monitoredRows.apply("Convert to TestRow", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + org.apache.beam.sdk.values.Row r = c.element(); + int id = r.getInt32("id"); + String name = r.getString("name"); + String part = r.getString("part"); + String expectedPart = (id % 2 == 0) ? "even" : "odd"; + org.junit.Assert.assertEquals(expectedPart, part); + c.output(TestRow.create(id, name)); + } + })); + } else { + namesAndIds = monitoredRows.apply("Convert to TestRow", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + org.apache.beam.sdk.values.Row r = c.element(); + c.output(TestRow.create(r.getInt32("id"), r.getString("name"))); + } + })); + } + + PCollection consolidatedHashcode = + namesAndIds + .apply(ParDo.of(new TestRow.SelectNameFn())) + .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); + + PAssert.that(consolidatedHashcode) + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numRecords)); + + PipelineResult result = pipelineRead.run(); + PipelineResult.State pipelineState = result.waitUntilFinish(); + assertNotEquals(PipelineResult.State.FAILED, pipelineState); + + collectAndPublishMetrics(result, scenarioName + "_read", numRecords); + } + + private void collectAndPublishMetrics(PipelineResult readResult, String metricName, int records) { + if (settings == null) { + return; + } + String uuid = UUID.randomUUID().toString(); + String timestamp = java.time.Instant.now().toString(); + + Set> suppliers = new HashSet<>(); + suppliers.add( + reader -> { + long start = reader.getStartTimeMetric(metricName); + long end = reader.getEndTimeMetric(metricName); + double duration = (end - start) / 1e3; + return NamedTestResult.create(uuid, timestamp, metricName + "_duration_sec", duration); + }); + suppliers.add( + reader -> { + long start = reader.getStartTimeMetric(metricName); + long end = reader.getEndTimeMetric(metricName); + double duration = (end - start) / 1e3; + double throughput = duration > 0 ? records / duration : 0.0; + return NamedTestResult.create(uuid, timestamp, metricName + "_throughput_ops_sec", throughput); + }); + + IOITMetrics metrics = new IOITMetrics(suppliers, readResult, NAMESPACE, uuid, timestamp); + metrics.publishToInflux(settings); + } + + private void cleanUp(String tablePath) { + try { + MatchResult matchResult = FileSystems.match(tablePath + "/**"); + List resourceIds = new ArrayList<>(); + for (MatchResult.Metadata metadata : matchResult.metadata()) { + resourceIds.add(metadata.resourceId()); + } + if (!resourceIds.isEmpty()) { + FileSystems.delete(resourceIds); + } + FileSystems.delete(Collections.singletonList(FileSystems.matchNewResource(tablePath, true))); + } catch (Exception e) { + // Ignore cleanup failures in test + } + } +} diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTestPipelineOptions.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTestPipelineOptions.java new file mode 100644 index 000000000000..b28d4fe03f08 --- /dev/null +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTestPipelineOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Validation; + +/** Pipeline options for DeltaIO integration and performance tests. */ +public interface DeltaIOTestPipelineOptions extends IOTestPipelineOptions { + + @Description("Folder path where the Delta table will be created") + @Validation.Required + String getTablePath(); + + void setTablePath(String value); +} From ed6e3f764d51a94906e6a035c1e784701ed46df7 Mon Sep 17 00:00:00 2001 From: Durgaprasad M L Date: Sat, 30 May 2026 16:10:37 +0530 Subject: [PATCH 2/2] [IO] Refactor DeltaIO runtime planning and type handling --- .../org/apache/beam/sdk/io/delta/DeltaIO.java | 790 ++++++++++++++++-- 1 file changed, 715 insertions(+), 75 deletions(-) diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java index 80fa34efd1cb..400e554ed6e4 100644 --- a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java @@ -59,6 +59,7 @@ import java.util.Map; import java.util.Optional; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -134,23 +135,20 @@ public PCollection expand(PBegin input) { Engine engine = DefaultEngine.create(hadoopConfig); Table table = Table.forPath(engine, getTablePath()); + StructType tableSchema; try { Snapshot snapshot = buildSnapshot(engine, table); - Scan scan = snapshot.getScanBuilder(engine).build(); - StructType readSchema = scan.getSchema(engine); - org.apache.beam.sdk.schemas.Schema beamSchema = inferBeamSchema(readSchema); - - List fileDescriptors = buildFileDescriptors(engine, scan); - - return input - .apply("CreateFileDescriptors", Create.of(fileDescriptors) - .withCoder(SerializableCoder.of(DeltaFileDescriptor.class))) - .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema))) - .setRowSchema(beamSchema); - + tableSchema = snapshot.getSchema(engine); } catch (Exception e) { - throw new RuntimeException("Failed to read Delta table: " + getTablePath(), e); + throw new RuntimeException("Failed to load schema for Delta table: " + getTablePath(), e); } + org.apache.beam.sdk.schemas.Schema beamSchema = inferBeamSchema(tableSchema); + + return input + .apply("CreateTablePath", Create.of(getTablePath())) + .apply("PlanScan", ParDo.of(new PlanScanFn(getHadoopConfig(), getVersion(), getTimestamp()))) + .apply("ReadFile", ParDo.of(new ReadFileFn(beamSchema))) + .setRowSchema(beamSchema); } private Snapshot buildSnapshot(Engine engine, Table table) throws Exception { @@ -163,9 +161,43 @@ private Snapshot buildSnapshot(Engine engine, Table table) throws Exception { return table.getLatestSnapshot(engine); } } + } + + public static class PlanScanFn extends DoFn { + private final @Nullable Map hadoopConfig; + private final @Nullable Long version; + private final @Nullable String timestamp; + + public PlanScanFn( + @Nullable Map hadoopConfig, + @Nullable Long version, + @Nullable String timestamp) { + this.hadoopConfig = hadoopConfig; + this.version = version; + this.timestamp = timestamp; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + String tablePath = c.element(); + org.apache.hadoop.conf.Configuration config = getHadoopConfiguration(hadoopConfig); + Engine engine = DefaultEngine.create(config); + Table table = Table.forPath(engine, tablePath); + + Snapshot snapshot; + if (version != null) { + snapshot = table.getSnapshotAsOfVersion(engine, version); + } else if (timestamp != null) { + long epochMillis = org.joda.time.Instant.parse(timestamp).getMillis(); + snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis); + } else { + snapshot = table.getLatestSnapshot(engine); + } + + Scan scan = snapshot.getScanBuilder(engine).build(); + io.delta.kernel.data.Row scanState = scan.getScanState(engine); + SerializableRow serializableScanState = new SerializableRow(scanState); - private List buildFileDescriptors(Engine engine, Scan scan) throws Exception { - List descriptors = new ArrayList<>(); try (CloseableIterator scanFileIter = scan.getScanFiles(engine)) { while (scanFileIter.hasNext()) { FilteredColumnarBatch scanFilesBatch = scanFileIter.next(); @@ -173,23 +205,26 @@ private List buildFileDescriptors(Engine engine, Scan scan) while (scanFileRows.hasNext()) { io.delta.kernel.data.Row scanFileRow = scanFileRows.next(); FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); - descriptors.add(new DeltaFileDescriptor( - getTablePath(), + SerializableRow serializableScanFileRow = new SerializableRow(scanFileRow); + c.output(new DeltaFileDescriptor( + tablePath, fileStatus.getPath(), fileStatus.getSize(), fileStatus.getModificationTime(), - getHadoopConfig(), - getVersion(), - getTimestamp() + hadoopConfig, + version, + timestamp, + serializableScanState, + serializableScanFileRow )); } } } } - return descriptors; } } + @DefaultCoder(SerializableCoder.class) public static class DeltaFileDescriptor implements Serializable { private final String tablePath; private final String filePath; @@ -198,6 +233,8 @@ public static class DeltaFileDescriptor implements Serializable { private final @Nullable Map hadoopConfig; private final @Nullable Long version; private final @Nullable String timestamp; + private final SerializableRow scanState; + private final SerializableRow scanFileRow; public DeltaFileDescriptor( String tablePath, @@ -206,7 +243,9 @@ public DeltaFileDescriptor( long modificationTime, @Nullable Map hadoopConfig, @Nullable Long version, - @Nullable String timestamp) { + @Nullable String timestamp, + SerializableRow scanState, + SerializableRow scanFileRow) { this.tablePath = tablePath; this.filePath = filePath; this.fileSize = fileSize; @@ -214,6 +253,8 @@ public DeltaFileDescriptor( this.hadoopConfig = hadoopConfig; this.version = version; this.timestamp = timestamp; + this.scanState = scanState; + this.scanFileRow = scanFileRow; } public String getTablePath() { @@ -243,6 +284,14 @@ public long getModificationTime() { public @Nullable String getTimestamp() { return timestamp; } + + public SerializableRow getScanState() { + return scanState; + } + + public SerializableRow getScanFileRow() { + return scanFileRow; + } } public static class ReadFileFn extends DoFn { @@ -257,53 +306,28 @@ public void processElement(ProcessContext c) throws Exception { DeltaFileDescriptor desc = c.element(); org.apache.hadoop.conf.Configuration hadoopConfig = getHadoopConfiguration(desc.getHadoopConfig()); Engine engine = DefaultEngine.create(hadoopConfig); - Table table = Table.forPath(engine, desc.getTablePath()); - - Snapshot snapshot; - if (desc.getVersion() != null) { - snapshot = table.getSnapshotAsOfVersion(engine, desc.getVersion()); - } else if (desc.getTimestamp() != null) { - long epochMillis = org.joda.time.Instant.parse(desc.getTimestamp()).getMillis(); - snapshot = table.getSnapshotAsOfTimestamp(engine, epochMillis); - } else { - snapshot = table.getLatestSnapshot(engine); - } - Scan scan = snapshot.getScanBuilder(engine).build(); - io.delta.kernel.data.Row scanState = scan.getScanState(engine); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(desc.getScanFileRow()); + StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(desc.getScanState()); + CloseableIterator physicalDataIter = + engine.getParquetHandler().readParquetFiles( + singletonCloseableIterator(fileStatus), + physicalReadSchema, + Optional.empty()).map(FilteredColumnarBatch::getData); - try (CloseableIterator scanFileIter = scan.getScanFiles(engine)) { - while (scanFileIter.hasNext()) { - FilteredColumnarBatch scanFilesBatch = scanFileIter.next(); - try (CloseableIterator scanFileRows = scanFilesBatch.getRows()) { - while (scanFileRows.hasNext()) { - io.delta.kernel.data.Row scanFileRow = scanFileRows.next(); - FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); - if (fileStatus.getPath().equals(desc.getFilePath())) { - StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(scanState); - CloseableIterator physicalDataIter = - engine.getParquetHandler().readParquetFiles( - singletonCloseableIterator(fileStatus), - physicalReadSchema, - Optional.empty()).map(FilteredColumnarBatch::getData); - try ( - CloseableIterator transformedData = - Scan.transformPhysicalData( - engine, - scanState, - scanFileRow, - physicalDataIter)) { - while (transformedData.hasNext()) { - FilteredColumnarBatch filteredData = transformedData.next(); - try (CloseableIterator rows = filteredData.getRows()) { - while (rows.hasNext()) { - io.delta.kernel.data.Row row = rows.next(); - c.output(convertKernelRowToBeamRow(row, beamSchema)); - } - } - } - } - } + try ( + CloseableIterator transformedData = + Scan.transformPhysicalData( + engine, + desc.getScanState(), + desc.getScanFileRow(), + physicalDataIter)) { + while (transformedData.hasNext()) { + FilteredColumnarBatch filteredData = transformedData.next(); + try (CloseableIterator rows = filteredData.getRows()) { + while (rows.hasNext()) { + io.delta.kernel.data.Row row = rows.next(); + c.output(convertKernelRowToBeamRow(row, beamSchema)); } } } @@ -325,7 +349,10 @@ private static org.apache.hadoop.conf.Configuration getHadoopConfiguration( private static org.apache.beam.sdk.schemas.Schema inferBeamSchema(StructType structType) { org.apache.beam.sdk.schemas.Schema.Builder builder = org.apache.beam.sdk.schemas.Schema.builder(); for (StructField field : structType.fields()) { - builder.addField(field.getName(), toBeamFieldType(field.getDataType())); + org.apache.beam.sdk.schemas.Schema.Field beamField = + org.apache.beam.sdk.schemas.Schema.Field.of(field.getName(), toBeamFieldType(field.getDataType())) + .withNullable(field.isNullable()); + builder.addField(beamField); } return builder.build(); } @@ -376,13 +403,18 @@ private static Row convertKernelRowToBeamRow( values.add(null); } else { DataType dataType = structType.at(i).getDataType(); - values.add(convertValue(deltaRow, i, dataType)); + org.apache.beam.sdk.schemas.Schema.FieldType beamFieldType = beamSchema.getField(i).getType(); + values.add(convertValue(deltaRow, i, dataType, beamFieldType)); } } return Row.withSchema(beamSchema).addValues(values).build(); } - private static Object convertValue(io.delta.kernel.data.Row deltaRow, int ordinal, DataType dataType) { + private static Object convertValue( + io.delta.kernel.data.Row deltaRow, + int ordinal, + DataType dataType, + org.apache.beam.sdk.schemas.Schema.FieldType beamFieldType) { if (deltaRow.isNullAt(ordinal)) { return null; } @@ -414,18 +446,19 @@ private static Object convertValue(io.delta.kernel.data.Row deltaRow, int ordina return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000); } else if (dataType instanceof StructType) { io.delta.kernel.data.Row structRow = deltaRow.getStruct(ordinal); - return convertKernelRowToBeamRow(structRow, inferBeamSchema((StructType) dataType)); + return convertKernelRowToBeamRow(structRow, beamFieldType.getRowSchema()); } else if (dataType instanceof ArrayType) { ArrayValue arrayVal = deltaRow.getArray(ordinal); int size = arrayVal.getSize(); List list = new ArrayList<>(size); DataType elemType = ((ArrayType) dataType).getElementType(); + org.apache.beam.sdk.schemas.Schema.FieldType beamCollectionElementType = beamFieldType.getCollectionElementType(); ColumnVector vec = arrayVal.getElements(); for (int j = 0; j < size; j++) { if (vec.isNullAt(j)) { list.add(null); } else { - list.add(convertVectorValue(vec, j, elemType)); + list.add(convertVectorValue(vec, j, elemType, beamCollectionElementType)); } } return list; @@ -435,11 +468,13 @@ private static Object convertValue(io.delta.kernel.data.Row deltaRow, int ordina Map map = new HashMap<>(size); DataType keyType = ((MapType) dataType).getKeyType(); DataType valueType = ((MapType) dataType).getValueType(); + org.apache.beam.sdk.schemas.Schema.FieldType beamMapKeyType = beamFieldType.getMapKeyType(); + org.apache.beam.sdk.schemas.Schema.FieldType beamMapValueType = beamFieldType.getMapValueType(); ColumnVector keysVec = mapVal.getKeys(); ColumnVector valuesVec = mapVal.getValues(); for (int j = 0; j < size; j++) { - Object key = convertVectorValue(keysVec, j, keyType); - Object val = valuesVec.isNullAt(j) ? null : convertVectorValue(valuesVec, j, valueType); + Object key = convertVectorValue(keysVec, j, keyType, beamMapKeyType); + Object val = valuesVec.isNullAt(j) ? null : convertVectorValue(valuesVec, j, valueType, beamMapValueType); map.put(key, val); } return map; @@ -448,7 +483,11 @@ private static Object convertValue(io.delta.kernel.data.Row deltaRow, int ordina } } - private static Object convertVectorValue(ColumnVector vec, int rowId, DataType dataType) { + private static Object convertVectorValue( + ColumnVector vec, + int rowId, + DataType dataType, + org.apache.beam.sdk.schemas.Schema.FieldType beamFieldType) { if (dataType instanceof IntegerType) { return vec.getInt(rowId); } else if (dataType instanceof LongType) { @@ -469,8 +508,609 @@ private static Object convertVectorValue(ColumnVector vec, int rowId, DataType d return vec.getBinary(rowId); } else if (dataType instanceof DecimalType) { return vec.getDecimal(rowId); + } else if (dataType instanceof TimestampType) { + long micros = vec.getLong(rowId); + return org.joda.time.Instant.ofEpochMilli(micros / 1000); + } else if (dataType instanceof DateType) { + int days = vec.getInt(rowId); + return org.joda.time.Instant.ofEpochMilli(days * 24L * 60 * 60 * 1000); + } else if (dataType instanceof StructType) { + io.delta.kernel.data.Row structRow = getStructFromVector(vec, rowId, (StructType) dataType); + return convertKernelRowToBeamRow(structRow, beamFieldType.getRowSchema()); + } else if (dataType instanceof ArrayType) { + ArrayValue arrayVal = vec.getArray(rowId); + int size = arrayVal.getSize(); + List list = new ArrayList<>(size); + DataType elemType = ((ArrayType) dataType).getElementType(); + org.apache.beam.sdk.schemas.Schema.FieldType beamCollectionElementType = beamFieldType.getCollectionElementType(); + ColumnVector elementsVec = arrayVal.getElements(); + for (int j = 0; j < size; j++) { + if (elementsVec.isNullAt(j)) { + list.add(null); + } else { + list.add(convertVectorValue(elementsVec, j, elemType, beamCollectionElementType)); + } + } + return list; + } else if (dataType instanceof MapType) { + MapValue mapVal = vec.getMap(rowId); + int size = mapVal.getSize(); + Map map = new HashMap<>(size); + DataType keyType = ((MapType) dataType).getKeyType(); + DataType valueType = ((MapType) dataType).getValueType(); + org.apache.beam.sdk.schemas.Schema.FieldType beamMapKeyType = beamFieldType.getMapKeyType(); + org.apache.beam.sdk.schemas.Schema.FieldType beamMapValueType = beamFieldType.getMapValueType(); + ColumnVector keysVec = mapVal.getKeys(); + ColumnVector valuesVec = mapVal.getValues(); + for (int j = 0; j < size; j++) { + Object key = convertVectorValue(keysVec, j, keyType, beamMapKeyType); + Object val = valuesVec.isNullAt(j) ? null : convertVectorValue(valuesVec, j, valueType, beamMapValueType); + map.put(key, val); + } + return map; } else { return vec.toString(); } } + + private static io.delta.kernel.data.Row getStructFromVector(ColumnVector vec, int rowId, StructType structType) { + return new io.delta.kernel.data.Row() { + @Override + public StructType getSchema() { + return structType; + } + + @Override + public boolean isNullAt(int ordinal) { + return vec.getChild(ordinal).isNullAt(rowId); + } + + @Override + public boolean getBoolean(int ordinal) { + return vec.getChild(ordinal).getBoolean(rowId); + } + + @Override + public byte getByte(int ordinal) { + return vec.getChild(ordinal).getByte(rowId); + } + + @Override + public short getShort(int ordinal) { + return vec.getChild(ordinal).getShort(rowId); + } + + @Override + public int getInt(int ordinal) { + return vec.getChild(ordinal).getInt(rowId); + } + + @Override + public long getLong(int ordinal) { + return vec.getChild(ordinal).getLong(rowId); + } + + @Override + public float getFloat(int ordinal) { + return vec.getChild(ordinal).getFloat(rowId); + } + + @Override + public double getDouble(int ordinal) { + return vec.getChild(ordinal).getDouble(rowId); + } + + @Override + public String getString(int ordinal) { + return vec.getChild(ordinal).getString(rowId); + } + + @Override + public BigDecimal getDecimal(int ordinal) { + return vec.getChild(ordinal).getDecimal(rowId); + } + + @Override + public byte[] getBinary(int ordinal) { + return vec.getChild(ordinal).getBinary(rowId); + } + + @Override + public io.delta.kernel.data.Row getStruct(int ordinal) { + return getStructFromVector(vec.getChild(ordinal), rowId, (StructType) vec.getChild(ordinal).getDataType()); + } + + @Override + public ArrayValue getArray(int ordinal) { + return vec.getChild(ordinal).getArray(rowId); + } + + @Override + public MapValue getMap(int ordinal) { + return vec.getChild(ordinal).getMap(rowId); + } + }; + } + + public abstract static class SerializableDataType implements Serializable { + public abstract DataType toDataType(); + + public static SerializableDataType fromDataType(DataType type) { + if (type instanceof IntegerType) { + return new SerializablePrimitive(0); + } else if (type instanceof LongType) { + return new SerializablePrimitive(1); + } else if (type instanceof StringType) { + return new SerializablePrimitive(2); + } else if (type instanceof DoubleType) { + return new SerializablePrimitive(3); + } else if (type instanceof FloatType) { + return new SerializablePrimitive(4); + } else if (type instanceof BooleanType) { + return new SerializablePrimitive(5); + } else if (type instanceof ShortType) { + return new SerializablePrimitive(6); + } else if (type instanceof ByteType) { + return new SerializablePrimitive(7); + } else if (type instanceof BinaryType) { + return new SerializablePrimitive(8); + } else if (type instanceof DateType) { + return new SerializablePrimitive(9); + } else if (type instanceof TimestampType) { + return new SerializablePrimitive(10); + } else if (type instanceof DecimalType) { + DecimalType dt = (DecimalType) type; + return new SerializableDecimal(dt.getPrecision(), dt.getScale()); + } else if (type instanceof StructType) { + return new SerializableStruct((StructType) type); + } else if (type instanceof ArrayType) { + ArrayType at = (ArrayType) type; + return new SerializableArray(at.getElementField()); + } else if (type instanceof MapType) { + MapType mt = (MapType) type; + return new SerializableMap(mt.getKeyField(), mt.getValueField()); + } else { + throw new IllegalArgumentException("Unsupported DataType: " + type); + } + } + } + + public static class SerializablePrimitive extends SerializableDataType { + private final int typeId; + + public SerializablePrimitive(int typeId) { + this.typeId = typeId; + } + + @Override + public DataType toDataType() { + switch (typeId) { + case 0: return IntegerType.INTEGER; + case 1: return LongType.LONG; + case 2: return StringType.STRING; + case 3: return DoubleType.DOUBLE; + case 4: return FloatType.FLOAT; + case 5: return BooleanType.BOOLEAN; + case 6: return ShortType.SHORT; + case 7: return ByteType.BYTE; + case 8: return BinaryType.BINARY; + case 9: return DateType.DATE; + case 10: return TimestampType.TIMESTAMP; + default: throw new IllegalStateException(); + } + } + } + + public static class SerializableDecimal extends SerializableDataType { + private final int precision; + private final int scale; + + public SerializableDecimal(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public DataType toDataType() { + return new DecimalType(precision, scale); + } + } + + public static class SerializableStructField implements Serializable { + private final String name; + private final SerializableDataType type; + private final boolean nullable; + + public SerializableStructField(StructField field) { + this.name = field.getName(); + this.type = SerializableDataType.fromDataType(field.getDataType()); + this.nullable = field.isNullable(); + } + + public StructField toStructField() { + return new StructField(name, type.toDataType(), nullable); + } + } + + public static class SerializableStruct extends SerializableDataType { + private final List fields; + + public SerializableStruct(StructType type) { + this.fields = new ArrayList<>(); + for (StructField f : type.fields()) { + this.fields.add(new SerializableStructField(f)); + } + } + + @Override + public DataType toDataType() { + List list = new ArrayList<>(); + for (SerializableStructField f : fields) { + list.add(f.toStructField()); + } + return new StructType(list); + } + } + + public static class SerializableArray extends SerializableDataType { + private final SerializableStructField elementField; + + public SerializableArray(StructField elementField) { + this.elementField = new SerializableStructField(elementField); + } + + @Override + public DataType toDataType() { + return new ArrayType(elementField.toStructField()); + } + } + + public static class SerializableMap extends SerializableDataType { + private final SerializableStructField keyField; + private final SerializableStructField valueField; + + public SerializableMap(StructField keyField, StructField valueField) { + this.keyField = new SerializableStructField(keyField); + this.valueField = new SerializableStructField(valueField); + } + + @Override + public DataType toDataType() { + return new MapType(keyField.toStructField(), valueField.toStructField()); + } + } + + public static class SerializableRow implements io.delta.kernel.data.Row, Serializable { + private final SerializableStruct schema; + private final List values; + + public SerializableRow(io.delta.kernel.data.Row source) { + this.schema = new SerializableStruct(source.getSchema()); + this.values = new ArrayList<>(); + StructType structType = source.getSchema(); + for (int i = 0; i < structType.length(); i++) { + if (source.isNullAt(i)) { + this.values.add(null); + } else { + DataType type = structType.at(i).getDataType(); + this.values.add(deepCopyValue(source, i, type)); + } + } + } + + private static Object deepCopyValue(io.delta.kernel.data.Row source, int ordinal, DataType type) { + if (source.isNullAt(ordinal)) { + return null; + } + if (type instanceof IntegerType) { + return source.getInt(ordinal); + } else if (type instanceof LongType) { + return source.getLong(ordinal); + } else if (type instanceof StringType) { + return source.getString(ordinal); + } else if (type instanceof DoubleType) { + return source.getDouble(ordinal); + } else if (type instanceof FloatType) { + return source.getFloat(ordinal); + } else if (type instanceof BooleanType) { + return source.getBoolean(ordinal); + } else if (type instanceof ShortType) { + return source.getShort(ordinal); + } else if (type instanceof ByteType) { + return source.getByte(ordinal); + } else if (type instanceof BinaryType) { + return source.getBinary(ordinal); + } else if (type instanceof DecimalType) { + return source.getDecimal(ordinal); + } else if (type instanceof TimestampType) { + return source.getLong(ordinal); + } else if (type instanceof DateType) { + return source.getInt(ordinal); + } else if (type instanceof StructType) { + return new SerializableRow(source.getStruct(ordinal)); + } else if (type instanceof ArrayType) { + return new SerializableArrayValue(source.getArray(ordinal), ((ArrayType) type).getElementType()); + } else if (type instanceof MapType) { + return new SerializableMapValue(source.getMap(ordinal), (MapType) type); + } else { + throw new IllegalArgumentException("Unsupported type: " + type); + } + } + + @Override + public StructType getSchema() { + return (StructType) schema.toDataType(); + } + + @Override + public boolean isNullAt(int ordinal) { + return values.get(ordinal) == null; + } + + @Override + public boolean getBoolean(int ordinal) { + return (Boolean) values.get(ordinal); + } + + @Override + public byte getByte(int ordinal) { + return (Byte) values.get(ordinal); + } + + @Override + public short getShort(int ordinal) { + return (Short) values.get(ordinal); + } + + @Override + public int getInt(int ordinal) { + return (Integer) values.get(ordinal); + } + + @Override + public long getLong(int ordinal) { + return (Long) values.get(ordinal); + } + + @Override + public float getFloat(int ordinal) { + return (Float) values.get(ordinal); + } + + @Override + public double getDouble(int ordinal) { + return (Double) values.get(ordinal); + } + + @Override + public String getString(int ordinal) { + return (String) values.get(ordinal); + } + + @Override + public BigDecimal getDecimal(int ordinal) { + return (BigDecimal) values.get(ordinal); + } + + @Override + public byte[] getBinary(int ordinal) { + return (byte[]) values.get(ordinal); + } + + @Override + public io.delta.kernel.data.Row getStruct(int ordinal) { + return (io.delta.kernel.data.Row) values.get(ordinal); + } + + @Override + public ArrayValue getArray(int ordinal) { + return (ArrayValue) values.get(ordinal); + } + + @Override + public MapValue getMap(int ordinal) { + return (MapValue) values.get(ordinal); + } + } + + public static class SerializableArrayValue implements ArrayValue, Serializable { + private final SerializableDataType elementType; + private final SerializableColumnVector elements; + private final int size; + + public SerializableArrayValue(ArrayValue source, DataType elemType) { + this.elementType = SerializableDataType.fromDataType(elemType); + this.size = source.getSize(); + this.elements = new SerializableColumnVector(source.getElements(), elemType); + } + + @Override + public int getSize() { + return size; + } + + @Override + public ColumnVector getElements() { + return elements; + } + } + + public static class SerializableMapValue implements MapValue, Serializable { + private final SerializableColumnVector keys; + private final SerializableColumnVector values; + private final int size; + + public SerializableMapValue(MapValue source, MapType mapType) { + this.size = source.getSize(); + this.keys = new SerializableColumnVector(source.getKeys(), mapType.getKeyType()); + this.values = new SerializableColumnVector(source.getValues(), mapType.getValueType()); + } + + @Override + public int getSize() { + return size; + } + + @Override + public ColumnVector getKeys() { + return keys; + } + + @Override + public ColumnVector getValues() { + return values; + } + } + + public static class SerializableColumnVector implements ColumnVector, Serializable { + private final SerializableDataType dataType; + private final int size; + private final List values; + private final List children; + + public SerializableColumnVector(ColumnVector source, DataType type) { + this.dataType = SerializableDataType.fromDataType(type); + this.size = source.getSize(); + this.values = new ArrayList<>(); + this.children = new ArrayList<>(); + + if (type instanceof StructType) { + StructType structType = (StructType) type; + for (int i = 0; i < structType.length(); i++) { + children.add(new SerializableColumnVector(source.getChild(i), structType.at(i).getDataType())); + } + } + + for (int i = 0; i < size; i++) { + if (source.isNullAt(i)) { + this.values.add(null); + } else { + this.values.add(deepCopyVectorValue(source, i, type)); + } + } + } + + @Override + public ColumnVector getChild(int ordinal) { + if (children.isEmpty()) { + throw new UnsupportedOperationException( + "Child vectors are not available for vector of type " + getDataType()); + } + return children.get(ordinal); + } + + private static Object deepCopyVectorValue(ColumnVector source, int rowId, DataType type) { + if (type instanceof IntegerType) { + return source.getInt(rowId); + } else if (type instanceof LongType) { + return source.getLong(rowId); + } else if (type instanceof StringType) { + return source.getString(rowId); + } else if (type instanceof DoubleType) { + return source.getDouble(rowId); + } else if (type instanceof FloatType) { + return source.getFloat(rowId); + } else if (type instanceof BooleanType) { + return source.getBoolean(rowId); + } else if (type instanceof ShortType) { + return source.getShort(rowId); + } else if (type instanceof ByteType) { + return source.getByte(rowId); + } else if (type instanceof BinaryType) { + return source.getBinary(rowId); + } else if (type instanceof DecimalType) { + return source.getDecimal(rowId); + } else if (type instanceof TimestampType) { + return source.getLong(rowId); + } else if (type instanceof DateType) { + return source.getInt(rowId); + } else if (type instanceof StructType) { + return new SerializableRow(getStructFromVector(source, rowId, (StructType) type)); + } else if (type instanceof ArrayType) { + return new SerializableArrayValue(source.getArray(rowId), ((ArrayType) type).getElementType()); + } else if (type instanceof MapType) { + return new SerializableMapValue(source.getMap(rowId), (MapType) type); + } else { + throw new IllegalArgumentException("Unsupported vector type: " + type); + } + } + + @Override + public DataType getDataType() { + return dataType.toDataType(); + } + + @Override + public int getSize() { + return size; + } + + @Override + public boolean isNullAt(int rowId) { + return values.get(rowId) == null; + } + + @Override + public boolean getBoolean(int rowId) { + return (Boolean) values.get(rowId); + } + + @Override + public byte getByte(int rowId) { + return (Byte) values.get(rowId); + } + + @Override + public short getShort(int rowId) { + return (Short) values.get(rowId); + } + + @Override + public int getInt(int rowId) { + return (Integer) values.get(rowId); + } + + @Override + public long getLong(int rowId) { + return (Long) values.get(rowId); + } + + @Override + public float getFloat(int rowId) { + return (Float) values.get(rowId); + } + + @Override + public double getDouble(int rowId) { + return (Double) values.get(rowId); + } + + @Override + public byte[] getBinary(int rowId) { + return (byte[]) values.get(rowId); + } + + @Override + public String getString(int rowId) { + return (String) values.get(rowId); + } + + @Override + public BigDecimal getDecimal(int rowId) { + return (BigDecimal) values.get(rowId); + } + + @Override + public ArrayValue getArray(int rowId) { + return (ArrayValue) values.get(rowId); + } + + @Override + public MapValue getMap(int rowId) { + return (MapValue) values.get(rowId); + } + + @Override + public void close() {} + } }