diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 6d5a2a6c2abd..9694c8a3ae91 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -209,10 +209,9 @@ object VeloxBackendSettings extends BackendSettingsApi { } val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) - val parquetMetadataValidationResult = - ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) - parquetMetadataValidationResult.map( - reason => s"Detected unsupported metadata in parquet files: $reason") + ParquetMetadataUtils + .validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) + .map(reason => s"Detected unsupported metadata in parquet files: $reason") } def validateDataSchema(): Option[String] = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 2d2193a53871..1676e91d179f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -121,6 +121,15 @@ object VeloxValidatorApi { case map: MapType => validateSchema(map.keyType).orElse(validateSchema(map.valueType)) case struct: StructType => + // Detect variant shredded struct produced by Spark's PushVariantIntoScan. + // These structs have all fields annotated with __VARIANT_METADATA_KEY metadata. + // Velox cannot read the variant shredding encoding in Parquet files. + if ( + struct.fields.nonEmpty && + struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY")) + ) { + return Some(s"Variant shredded struct is not supported: $struct") + } struct.foreach { field => val reason = validateSchema(field.dataType) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index ab76cba4aa5d..9864f42a9f3c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -148,6 +148,12 @@ object ParquetMetadataUtils extends Logging { isTimezoneFoundInMetadata(footer, parquetOptions) ) + // Variant annotation check: Velox native reader does not check variant annotations, + // so fallback to vanilla Spark when detected. + if (SparkShimLoader.getSparkShims.shouldFallbackForParquetVariantAnnotation(footer)) { + return Some("Variant annotation detected in Parquet file.") + } + for (check <- validationChecks) { if (check.isDefined) { return check diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 2f2a99a1a5e4..64aca6fa2271 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -857,8 +857,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenUDTRegistrationSuite] enableSuite[GlutenUnsafeRowSuite] enableSuite[GlutenUserDefinedTypeSuite] - // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures - // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures + enableSuite[GlutenVariantEndToEndSuite] + enableSuite[GlutenVariantShreddingSuite] enableSuite[GlutenVariantSuite] enableSuite[GlutenVariantWriteShreddingSuite] enableSuite[GlutenXmlFunctionsSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index cbbe61255313..4e14df0d8fdd 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -399,7 +399,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") .exclude("parquet widening conversion ShortType -> DoubleType") - // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure + enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure @@ -822,8 +822,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenUDTRegistrationSuite] enableSuite[GlutenUnsafeRowSuite] enableSuite[GlutenUserDefinedTypeSuite] - // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures - // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures + enableSuite[GlutenVariantEndToEndSuite] + enableSuite[GlutenVariantShreddingSuite] enableSuite[GlutenVariantSuite] enableSuite[GlutenVariantWriteShreddingSuite] enableSuite[GlutenXmlFunctionsSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala index ea9d5a74189e..deb92a27fdff 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala @@ -16,8 +16,15 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.gluten.config.GlutenConfig + import org.apache.spark.sql.GlutenSQLTestsTrait class GlutenParquetVariantShreddingSuite extends ParquetVariantShreddingSuite - with GlutenSQLTestsTrait {} + with GlutenSQLTestsTrait { + + override def sparkConf: org.apache.spark.SparkConf = { + super.sparkConf.set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key, "true") + } +} diff --git a/pom.xml b/pom.xml index f6b10ba991ea..2227f9d353eb 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,7 @@ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true + -Dfile.encoding=UTF-8 file:src/test/resources/log4j2.properties diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 1f6d015393f1..2f5350f38a08 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -237,6 +237,8 @@ trait SparkShims { def isParquetFileEncrypted(footer: ParquetMetadata): Boolean + def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false + def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]] diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala index 0e3e752f9970..5ff9d51c71db 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata} import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType} import java.time.ZoneOffset import java.util.{Map => JMap} @@ -571,6 +571,23 @@ class Spark41Shims extends SparkShims { } } + override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = { + if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) { + false + } else { + containsVariantAnnotation(footer.getFileMetaData.getSchema) + } + } + + private def containsVariantAnnotation(groupType: GroupType): Boolean = { + groupType.getFields.asScala.exists { + field => + Option(field.getLogicalTypeAnnotation) + .exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) || + (!field.isPrimitive && containsVariantAnnotation(field.asGroupType())) + } + } + override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]