From ce27cd81e7ff385c5fc7dc929759e1cfeb3496d8 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Fri, 13 Mar 2026 14:09:11 +0000 Subject: [PATCH] [GLUTEN-11550][VL][UT] Enable Variant test suites Enable GlutenVariantEndToEndSuite, GlutenVariantShreddingSuite, and GlutenParquetVariantShreddingSuite for both spark40 and spark41. Changes: 1. VeloxValidatorApi: Detect variant shredded structs produced by Spark's PushVariantIntoScan (checking __VARIANT_METADATA_KEY metadata) to trigger fallback to Spark's native Parquet reader. 2. ParquetMetadataUtils: Add variant annotation check in isUnsupportedMetadata, gated by parquetMetadataValidationEnabled. Reads the same footer, no extra I/O. 3. Spark41Shims: Add shouldFallbackForParquetVariantAnnotation to detect Parquet variant logical type annotations. 4. GlutenParquetVariantShreddingSuite (spark41): Set parquetMetadataValidationEnabled=true to enable variant annotation fallback detection. 5. pom.xml: Add -Dfile.encoding=UTF-8 to test JVM args. On JDK 17 with LANG=C (CI containers centos-8/9), the default charset is US-ASCII causing garbled output for multi-byte characters. JDK 18+ defaults to UTF-8 via JEP 400. See: https://github.com/apache/spark/blob/v4.0.1/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java#L508 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../backendsapi/velox/VeloxBackend.scala | 7 +++---- .../backendsapi/velox/VeloxValidatorApi.scala | 9 +++++++++ .../gluten/utils/ParquetMetadataUtils.scala | 6 ++++++ .../utils/velox/VeloxTestSettings.scala | 4 ++-- .../utils/velox/VeloxTestSettings.scala | 6 +++--- .../GlutenParquetVariantShreddingSuite.scala | 9 ++++++++- pom.xml | 1 + .../apache/gluten/sql/shims/SparkShims.scala | 2 ++ .../sql/shims/spark41/Spark41Shims.scala | 19 ++++++++++++++++++- 9 files changed, 52 insertions(+), 11 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 6d5a2a6c2abd..9694c8a3ae91 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -209,10 +209,9 @@ object VeloxBackendSettings extends BackendSettingsApi { } val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) - val parquetMetadataValidationResult = - ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) - parquetMetadataValidationResult.map( - reason => s"Detected unsupported metadata in parquet files: $reason") + ParquetMetadataUtils + .validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) + .map(reason => s"Detected unsupported metadata in parquet files: $reason") } def validateDataSchema(): Option[String] = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 2d2193a53871..1676e91d179f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -121,6 +121,15 @@ object VeloxValidatorApi { case map: MapType => validateSchema(map.keyType).orElse(validateSchema(map.valueType)) case struct: StructType => + // Detect variant shredded struct produced by Spark's PushVariantIntoScan. + // These structs have all fields annotated with __VARIANT_METADATA_KEY metadata. + // Velox cannot read the variant shredding encoding in Parquet files. + if ( + struct.fields.nonEmpty && + struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY")) + ) { + return Some(s"Variant shredded struct is not supported: $struct") + } struct.foreach { field => val reason = validateSchema(field.dataType) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index ab76cba4aa5d..9864f42a9f3c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -148,6 +148,12 @@ object ParquetMetadataUtils extends Logging { isTimezoneFoundInMetadata(footer, parquetOptions) ) + // Variant annotation check: Velox native reader does not check variant annotations, + // so fallback to vanilla Spark when detected. + if (SparkShimLoader.getSparkShims.shouldFallbackForParquetVariantAnnotation(footer)) { + return Some("Variant annotation detected in Parquet file.") + } + for (check <- validationChecks) { if (check.isDefined) { return check diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 2f2a99a1a5e4..64aca6fa2271 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -857,8 +857,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenUDTRegistrationSuite] enableSuite[GlutenUnsafeRowSuite] enableSuite[GlutenUserDefinedTypeSuite] - // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures - // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures + enableSuite[GlutenVariantEndToEndSuite] + enableSuite[GlutenVariantShreddingSuite] enableSuite[GlutenVariantSuite] enableSuite[GlutenVariantWriteShreddingSuite] enableSuite[GlutenXmlFunctionsSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index cbbe61255313..4e14df0d8fdd 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -399,7 +399,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") .exclude("parquet widening conversion ShortType -> DoubleType") - // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure + enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure @@ -822,8 +822,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenUDTRegistrationSuite] enableSuite[GlutenUnsafeRowSuite] enableSuite[GlutenUserDefinedTypeSuite] - // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures - // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures + enableSuite[GlutenVariantEndToEndSuite] + enableSuite[GlutenVariantShreddingSuite] enableSuite[GlutenVariantSuite] enableSuite[GlutenVariantWriteShreddingSuite] enableSuite[GlutenXmlFunctionsSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala index ea9d5a74189e..deb92a27fdff 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala @@ -16,8 +16,15 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.gluten.config.GlutenConfig + import org.apache.spark.sql.GlutenSQLTestsTrait class GlutenParquetVariantShreddingSuite extends ParquetVariantShreddingSuite - with GlutenSQLTestsTrait {} + with GlutenSQLTestsTrait { + + override def sparkConf: org.apache.spark.SparkConf = { + super.sparkConf.set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key, "true") + } +} diff --git a/pom.xml b/pom.xml index f6b10ba991ea..2227f9d353eb 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,7 @@ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true + -Dfile.encoding=UTF-8 file:src/test/resources/log4j2.properties diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 1f6d015393f1..2f5350f38a08 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -237,6 +237,8 @@ trait SparkShims { def isParquetFileEncrypted(footer: ParquetMetadata): Boolean + def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false + def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]] diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala index 0e3e752f9970..5ff9d51c71db 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata} import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType} import java.time.ZoneOffset import java.util.{Map => JMap} @@ -571,6 +571,23 @@ class Spark41Shims extends SparkShims { } } + override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = { + if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) { + false + } else { + containsVariantAnnotation(footer.getFileMetaData.getSchema) + } + } + + private def containsVariantAnnotation(groupType: GroupType): Boolean = { + groupType.getFields.asScala.exists { + field => + Option(field.getLogicalTypeAnnotation) + .exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) || + (!field.isPrimitive && containsVariantAnnotation(field.asGroupType())) + } + } + override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]