diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala index 91bc39bd2e4b..038afa38982b 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.delta -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.spark.internal.Logging @@ -31,16 +30,11 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil -import org.slf4j.LoggerFactory - class GlutenParquetFileFormat extends ParquetFileFormat with DataSourceRegister with Logging with Serializable { - import GlutenParquetFileFormat._ - - private val logger = LoggerFactory.getLogger(classOf[GlutenParquetFileFormat]) override def shortName(): String = "gluten-parquet" @@ -62,39 +56,27 @@ class GlutenParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - if (isNativeWritable(dataSchema)) { - // Pass compression to job conf so that the file extension can be aware of it. - val conf = ContextUtil.getConfiguration(job) - val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) - val nativeConf = - GlutenFormatFactory("parquet") - .nativeConf(options, parquetOptions.compressionCodecClassName) + // Pass compression to job conf so that the file extension can be aware of it. + val conf = ContextUtil.getConfiguration(job) + val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) + val nativeConf = + GlutenFormatFactory("parquet") + .nativeConf(options, parquetOptions.compressionCodecClassName) - return new OutputWriterFactory { - override def getFileExtension(context: TaskAttemptContext): String = { - CodecConfig.from(context).getCodec.getExtension + ".parquet" - } + new OutputWriterFactory { + override def getFileExtension(context: TaskAttemptContext): String = { + CodecConfig.from(context).getCodec.getExtension + ".parquet" + } - override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - GlutenFormatFactory("parquet") - .createOutputWriter(path, dataSchema, context, nativeConf) + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + GlutenFormatFactory("parquet") + .createOutputWriter(path, dataSchema, context, nativeConf) - } } } - logger.warn( - s"Data schema is unsupported by Gluten Parquet writer: $dataSchema, " + - s"falling back to the vanilla Spark Parquet writer") - super.prepareWrite(sparkSession, job, options, dataSchema) - } -} - -object GlutenParquetFileFormat { - def isNativeWritable(schema: StructType): Boolean = { - BackendsApiManager.getSettings.supportNativeWrite(schema.fields) } } diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala index 653b705becf0..ee3b49e47eef 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.connector.write.WriterCommitMessage -import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat} +import org.apache.spark.sql.delta.DeltaOptions import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker import org.apache.spark.sql.errors.QueryExecutionErrors @@ -126,7 +126,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { val dataSchema = dataColumns.toStructType DataSourceUtils.verifySchema(fileFormat, dataSchema) DataSourceUtils.checkFieldNames(fileFormat, dataSchema) - val isNativeWritable = GlutenParquetFileFormat.isNativeWritable(dataSchema) + val isNativeWritable = true val outputDataColumns = if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 6d5a2a6c2abd..7fb04d28ae5f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -285,89 +285,38 @@ object VeloxBackendSettings extends BackendSettingsApi { isPartitionedTable: Boolean, options: Map[String, String]): ValidationResult = { - // Validate if HiveFileFormat write is supported based on output file type - def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] = { - // Reflect to get access to fileSinkConf which contains the output file format - val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf") - fileSinkConfField.setAccessible(true) - val fileSinkConf = fileSinkConfField.get(hiveFileFormat) - val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo") - tableInfoField.setAccessible(true) - val tableInfo = tableInfoField.get(fileSinkConf) - val getOutputFileFormatClassNameMethod = tableInfo.getClass - .getDeclaredMethod("getOutputFileFormatClassName") - val outputFileFormatClassName = getOutputFileFormatClassNameMethod.invoke(tableInfo) - - // Match based on the output file format class name - outputFileFormatClassName match { - case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => - None - case _ => - Some( - "HiveFileFormat is supported only with Parquet as the output file type" - ) // Unsupported format + def validateFileFormat(): Option[String] = { + format match { + case _: ParquetFileFormat => None + case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter => + // Validate HiveFileFormat is backed by Parquet + val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf") + fileSinkConfField.setAccessible(true) + val fileSinkConf = fileSinkConfField.get(h) + val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo") + tableInfoField.setAccessible(true) + val tableInfo = tableInfoField.get(fileSinkConf) + val outputFormat = tableInfo.getClass + .getDeclaredMethod("getOutputFileFormatClassName") + .invoke(tableInfo) + outputFormat match { + case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => None + case _ => Some("HiveFileFormat is supported only with Parquet as the output file type") + } + case _ => Some("Only ParquetFileFormat and HiveFileFormat are supported.") } } def validateCompressionCodec(): Option[String] = { - val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw") - val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options) - if (unSupportedCompressions.contains(compressionCodec)) { - Some(s"$compressionCodec compression codec is unsupported in Velox backend.") + val unsupported = Set("brotli", "lzo", "lz4raw", "lz4_raw") + val codec = WriteFilesExecTransformer.getCompressionCodec(options) + if (unsupported.contains(codec)) { + Some(s"$codec compression codec is unsupported in Velox backend.") } else { None } } - // Validate if all types are supported. - def validateDataTypes(): Option[String] = { - val unsupportedTypes = format match { - case _: ParquetFileFormat => - fields.flatMap { - case StructField(_, _: YearMonthIntervalType, _, _) => - Some("YearMonthIntervalType") - case StructField(_, _: StructType, _, _) => - Some("StructType") - case _ => None - } - case _ => - fields.flatMap { - field => - field.dataType match { - case _: StructType => Some("StructType") - case _: ArrayType => Some("ArrayType") - case _: MapType => Some("MapType") - case _: YearMonthIntervalType => Some("YearMonthIntervalType") - case _ => None - } - } - } - if (unsupportedTypes.nonEmpty) { - Some(unsupportedTypes.mkString("Found unsupported type:", ",", "")) - } else { - None - } - } - - def validateFieldMetadata(): Option[String] = { - fields.find(_.metadata != Metadata.empty).map { - filed => - s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}" - } - } - - def validateFileFormat(): Option[String] = { - format match { - case _: ParquetFileFormat => None // Parquet is directly supported - case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter => - validateHiveFileFormat(h) // Parquet via Hive SerDe - case _ => - Some( - "Only ParquetFileFormat and HiveFileFormat are supported." - ) // Unsupported format - } - } - def validateWriteFilesOptions(): Option[String] = { val maxRecordsPerFile = options .get("maxRecordsPerFile") @@ -381,22 +330,46 @@ object VeloxBackendSettings extends BackendSettingsApi { } def validateBucketSpec(): Option[String] = { - val isHiveCompatibleBucketTable = bucketSpec.nonEmpty && options - .getOrElse("__hive_compatible_bucketed_table_insertion__", "false") - .equals("true") - // Currently, the velox backend only supports bucketed tables compatible with Hive and - // is limited to partitioned tables. Therefore, we should add this condition restriction. - // After velox supports bucketed non-partitioned tables, we can remove the restriction on - // partitioned tables. - if (bucketSpec.isEmpty || isHiveCompatibleBucketTable) { + if ( + bucketSpec.nonEmpty && !options + .getOrElse("__hive_compatible_bucketed_table_insertion__", "false") + .equals("true") + ) { + Some("Unsupported native write: non-compatible hive bucket write is not supported.") + } else { None + } + } + + def validateDataTypes(): Option[String] = { + def hasUnsupportedType(dt: DataType): Boolean = dt match { + case _: YearMonthIntervalType => true + case st: StructType => st.fields.exists(f => hasUnsupportedType(f.dataType)) + case at: ArrayType => hasUnsupportedType(at.elementType) + case mt: MapType => hasUnsupportedType(mt.keyType) || hasUnsupportedType(mt.valueType) + case _ => false + } + + val unsupported = fields.filter(f => hasUnsupportedType(f.dataType)) + if (unsupported.nonEmpty) { + Some( + unsupported + .map(_.dataType.simpleString) + .mkString("Found unsupported type:", ",", "")) } else { - Some("Unsupported native write: non-compatible hive bucket write is not supported.") + None } } - validateCompressionCodec() - .orElse(validateFileFormat()) + def validateFieldMetadata(): Option[String] = { + fields.find(_.metadata != Metadata.empty).map { + filed => + s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}" + } + } + + validateFileFormat() + .orElse(validateCompressionCodec()) .orElse(validateFieldMetadata()) .orElse(validateDataTypes()) .orElse(validateWriteFilesOptions()) @@ -406,14 +379,6 @@ object VeloxBackendSettings extends BackendSettingsApi { } } - override def supportNativeWrite(fields: Array[StructField]): Boolean = { - def isNotSupported(dataType: DataType): Boolean = dataType match { - case _: StructType | _: ArrayType | _: MapType => true - case _ => false - } - !fields.exists(field => isNotSupported(field.dataType)) - } - override def supportExpandExec(): Boolean = true override def supportSortExec(): Boolean = true diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index 1b0d0647dd9d..a4577c8a5b22 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -28,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteUtils { + override protected val resourcePath: String = "/tpch-data-parquet" override protected val fileFormat: String = "parquet" @@ -62,9 +63,10 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU withTempPath { f => val path = f.getCanonicalPath + // TODO: maybe remove constant complex type restriction (Spark 3.4+) checkNativeWrite( s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET SELECT array(struct(1), null) as var1", - expectNative = false) + expectNative = !isSparkVersionGE("3.4")) } } @@ -120,6 +122,57 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU } } + test("test insert into with struct type") { + withTable("t", "src") { + spark.sql("CREATE TABLE src (info STRUCT) USING PARQUET") + checkNativeWrite("INSERT INTO src SELECT named_struct('name', 'alice', 'age', 30)") + spark.sql("CREATE TABLE t (info STRUCT) USING PARQUET") + checkNativeWrite("INSERT INTO t SELECT info FROM src") + checkAnswer(spark.table("t"), Row(Row("alice", 30))) + } + } + + test("test insert into with array type") { + withTable("t", "src") { + spark.sql("CREATE TABLE src (ids ARRAY) USING PARQUET") + // TODO: maybe remove constant complex type restriction (Spark 3.4+) + checkNativeWrite( + "INSERT INTO src SELECT array(1, 2, 3)", + expectNative = !isSparkVersionGE("3.4")) + spark.sql("CREATE TABLE t (ids ARRAY) USING PARQUET") + checkNativeWrite("INSERT INTO t SELECT ids FROM src") + checkAnswer(spark.table("t"), Row(Seq(1, 2, 3))) + } + } + + test("test insert into with map type") { + withTable("t", "src") { + spark.sql("CREATE TABLE src (kv MAP) USING PARQUET") + // TODO: maybe remove constant complex type restriction (Spark 3.4+) + checkNativeWrite( + "INSERT INTO src SELECT map('a', 1, 'b', 2)", + expectNative = !isSparkVersionGE("3.4")) + spark.sql("CREATE TABLE t (kv MAP) USING PARQUET") + checkNativeWrite("INSERT INTO t SELECT kv FROM src") + checkAnswer(spark.table("t"), Row(Map("a" -> 1, "b" -> 2))) + } + } + + test("test insert into with nested struct type") { + withTable("t", "src") { + spark.sql( + """CREATE TABLE src (info STRUCT>) + |USING PARQUET""".stripMargin) + checkNativeWrite( + "INSERT INTO src SELECT named_struct('name', 'alice', " + + "'addr', named_struct('city', 'hangzhou', 'zip', 310000))") + spark.sql("""CREATE TABLE t (info STRUCT>) + |USING PARQUET""".stripMargin) + checkNativeWrite("INSERT INTO t SELECT info FROM src") + checkAnswer(spark.table("t"), Row(Row("alice", Row("hangzhou", 310000)))) + } + } + test("test ctas") { withTable("velox_ctas") { spark diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 8dd3156099e9..79274b4c6015 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -59,8 +59,6 @@ trait BackendSettingsApi { isPartitionedTable: Boolean, options: Map[String, String]): ValidationResult = ValidationResult.succeeded - def supportNativeWrite(fields: Array[StructField]): Boolean = true - def supportNativeMetadataColumns(): Boolean = true def supportNativeRowIndexColumn(): Boolean = true diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 606fa377b8a7..346d6b7f4dce 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -101,10 +101,7 @@ object GlutenWriterColumnarRules { case rc @ DataWritingCommandExec(cmd, child) => // The same thread can set these properties in the last query submission. val format = - if ( - BackendsApiManager.getSettings.supportNativeWrite(child.schema.fields) && - BackendsApiManager.getSettings.enableNativeWriteFiles() - ) { + if (BackendsApiManager.getSettings.enableNativeWriteFiles()) { getNativeFormat(cmd) } else { None