Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,9 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
val parquetMetadataValidationResult =
ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
parquetMetadataValidationResult.map(
reason => s"Detected unsupported metadata in parquet files: $reason")
ParquetMetadataUtils
.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
.map(reason => s"Detected unsupported metadata in parquet files: $reason")
}

def validateDataSchema(): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ object VeloxValidatorApi {
case map: MapType =>
validateSchema(map.keyType).orElse(validateSchema(map.valueType))
case struct: StructType =>
// Detect variant shredded struct produced by Spark's PushVariantIntoScan.
// These structs have all fields annotated with __VARIANT_METADATA_KEY metadata.
// Velox cannot read the variant shredding encoding in Parquet files.
if (
struct.fields.nonEmpty &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add nonEmpty check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, nonEmpty is needed because forall returns true for an empty collection. Without it, an empty struct would be incorrectly detected as variant shredded.

struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY"))
) {
return Some(s"Variant shredded struct is not supported: $struct")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might the struct $struct too heavy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is consistent with the existing pattern at line 144: s"Schema / data type not supported: $schema" which also prints the full type. Since this is a validation failure message (logged once per scan fallback, not per row), the overhead is negligible.

}
struct.foreach {
field =>
val reason = validateSchema(field.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ object ParquetMetadataUtils extends Logging {
isTimezoneFoundInMetadata(footer, parquetOptions)
)

// Variant annotation check: Velox native reader does not check variant annotations,
// so fallback to vanilla Spark when detected.
if (SparkShimLoader.getSparkShims.shouldFallbackForParquetVariantAnnotation(footer)) {
return Some("Variant annotation detected in Parquet file.")
}

for (check <- validationChecks) {
if (check.isDefined) {
return check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
enableSuite[GlutenUserDefinedTypeSuite]
// TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
// TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
enableSuite[GlutenVariantEndToEndSuite]
enableSuite[GlutenVariantShreddingSuite]
enableSuite[GlutenVariantSuite]
enableSuite[GlutenVariantWriteShreddingSuite]
enableSuite[GlutenXmlFunctionsSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("parquet widening conversion ShortType -> DecimalType(20,0)")
.exclude("parquet widening conversion ShortType -> DecimalType(38,0)")
.exclude("parquet widening conversion ShortType -> DoubleType")
// TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure
enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
// TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
// TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure
Expand Down Expand Up @@ -822,8 +822,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
enableSuite[GlutenUserDefinedTypeSuite]
// TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
// TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
enableSuite[GlutenVariantEndToEndSuite]
enableSuite[GlutenVariantShreddingSuite]
enableSuite[GlutenVariantSuite]
enableSuite[GlutenVariantWriteShreddingSuite]
enableSuite[GlutenXmlFunctionsSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@
*/
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.gluten.config.GlutenConfig

import org.apache.spark.sql.GlutenSQLTestsTrait

class GlutenParquetVariantShreddingSuite
extends ParquetVariantShreddingSuite
with GlutenSQLTestsTrait {}
with GlutenSQLTestsTrait {

override def sparkConf: org.apache.spark.SparkConf = {
super.sparkConf.set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key, "true")
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true
-Dfile.encoding=UTF-8
</extraJavaTestArgs>
<log4j.conf>file:src/test/resources/log4j2.properties</log4j.conf>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ trait SparkShims {

def isParquetFileEncrypted(footer: ParquetMetadata): Boolean

def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would we not fallback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default false because Spark 3.x and 4.0 don't have Parquet variant logical type annotations — only Spark 4.1 introduced VariantLogicalTypeAnnotation. Spark41Shims overrides this to actually check the Parquet schema. For older Spark versions, there's nothing to detect, so no fallback needed.


def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] =
Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata}
import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType}

import java.time.ZoneOffset
import java.util.{Map => JMap}
Expand Down Expand Up @@ -571,6 +571,23 @@ class Spark41Shims extends SparkShims {
}
}

override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = {
if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) {
false
} else {
containsVariantAnnotation(footer.getFileMetaData.getSchema)
}
}

private def containsVariantAnnotation(groupType: GroupType): Boolean = {
groupType.getFields.asScala.exists {
field =>
Option(field.getLogicalTypeAnnotation)
.exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) ||
(!field.isPrimitive && containsVariantAnnotation(field.asGroupType()))
}
}

override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] =
file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]

Expand Down
Loading