From 1514adeeb4b009dedc66b74cebacbdb1df528bc0 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 19 Mar 2026 11:39:15 +0800 Subject: [PATCH 1/3] native write --- .../sql/delta/GlutenParquetFileFormat.scala | 48 +++---- .../files/GlutenDeltaFileFormatWriter.scala | 4 +- .../backendsapi/velox/VeloxBackend.scala | 126 +++++++----------- .../execution/VeloxParquetWriteSuite.scala | 48 +++++++ .../backendsapi/BackendSettingsApi.scala | 2 - .../GlutenWriterColumnarRules.scala | 5 +- 6 files changed, 117 insertions(+), 116 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala index 91bc39bd2e4b..c4ffb47d7199 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.delta -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.spark.internal.Logging @@ -38,7 +37,6 @@ class GlutenParquetFileFormat with DataSourceRegister with Logging with Serializable { - import GlutenParquetFileFormat._ private val logger = LoggerFactory.getLogger(classOf[GlutenParquetFileFormat]) @@ -62,39 +60,27 @@ class GlutenParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - if (isNativeWritable(dataSchema)) { - // Pass compression to job conf so that the file extension can be aware of it. - val conf = ContextUtil.getConfiguration(job) - val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) - val nativeConf = - GlutenFormatFactory("parquet") - .nativeConf(options, parquetOptions.compressionCodecClassName) + // Pass compression to job conf so that the file extension can be aware of it. + val conf = ContextUtil.getConfiguration(job) + val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) + val nativeConf = + GlutenFormatFactory("parquet") + .nativeConf(options, parquetOptions.compressionCodecClassName) - return new OutputWriterFactory { - override def getFileExtension(context: TaskAttemptContext): String = { - CodecConfig.from(context).getCodec.getExtension + ".parquet" - } + new OutputWriterFactory { + override def getFileExtension(context: TaskAttemptContext): String = { + CodecConfig.from(context).getCodec.getExtension + ".parquet" + } - override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - GlutenFormatFactory("parquet") - .createOutputWriter(path, dataSchema, context, nativeConf) + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + GlutenFormatFactory("parquet") + .createOutputWriter(path, dataSchema, context, nativeConf) - } } } - logger.warn( - s"Data schema is unsupported by Gluten Parquet writer: $dataSchema, " + - s"falling back to the vanilla Spark Parquet writer") - super.prepareWrite(sparkSession, job, options, dataSchema) - } -} - -object GlutenParquetFileFormat { - def isNativeWritable(schema: StructType): Boolean = { - BackendsApiManager.getSettings.supportNativeWrite(schema.fields) } } diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala index 653b705becf0..ee3b49e47eef 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.connector.write.WriterCommitMessage -import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat} +import org.apache.spark.sql.delta.DeltaOptions import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker import org.apache.spark.sql.errors.QueryExecutionErrors @@ -126,7 +126,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { val dataSchema = dataColumns.toStructType DataSourceUtils.verifySchema(fileFormat, dataSchema) DataSourceUtils.checkFieldNames(fileFormat, dataSchema) - val isNativeWritable = GlutenParquetFileFormat.isNativeWritable(dataSchema) + val isNativeWritable = true val outputDataColumns = if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 6d5a2a6c2abd..79e7a10071ac 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -285,49 +285,68 @@ object VeloxBackendSettings extends BackendSettingsApi { isPartitionedTable: Boolean, options: Map[String, String]): ValidationResult = { - // Validate if HiveFileFormat write is supported based on output file type - def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] = { - // Reflect to get access to fileSinkConf which contains the output file format - val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf") - fileSinkConfField.setAccessible(true) - val fileSinkConf = fileSinkConfField.get(hiveFileFormat) - val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo") - tableInfoField.setAccessible(true) - val tableInfo = tableInfoField.get(fileSinkConf) - val getOutputFileFormatClassNameMethod = tableInfo.getClass - .getDeclaredMethod("getOutputFileFormatClassName") - val outputFileFormatClassName = getOutputFileFormatClassNameMethod.invoke(tableInfo) - - // Match based on the output file format class name - outputFileFormatClassName match { - case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => - None - case _ => - Some( - "HiveFileFormat is supported only with Parquet as the output file type" - ) // Unsupported format + def validateFileFormat(): Option[String] = { + format match { + case _: ParquetFileFormat => None + case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter => + // Validate HiveFileFormat is backed by Parquet + val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf") + fileSinkConfField.setAccessible(true) + val fileSinkConf = fileSinkConfField.get(h) + val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo") + tableInfoField.setAccessible(true) + val tableInfo = tableInfoField.get(fileSinkConf) + val outputFormat = tableInfo.getClass + .getDeclaredMethod("getOutputFileFormatClassName") + .invoke(tableInfo) + outputFormat match { + case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => None + case _ => Some("HiveFileFormat is supported only with Parquet as the output file type") + } + case _ => Some("Only ParquetFileFormat and HiveFileFormat are supported.") } } def validateCompressionCodec(): Option[String] = { - val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw") - val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options) - if (unSupportedCompressions.contains(compressionCodec)) { - Some(s"$compressionCodec compression codec is unsupported in Velox backend.") + val unsupported = Set("brotli", "lzo", "lz4raw", "lz4_raw") + val codec = WriteFilesExecTransformer.getCompressionCodec(options) + if (unsupported.contains(codec)) { + Some(s"$codec compression codec is unsupported in Velox backend.") + } else { + None + } + } + + def validateWriteFilesOptions(): Option[String] = { + val maxRecordsPerFile = options + .get("maxRecordsPerFile") + .map(_.toLong) + .getOrElse(SQLConf.get.maxRecordsPerFile) + if (maxRecordsPerFile > 0) { + Some("Unsupported native write: maxRecordsPerFile not supported.") + } else { + None + } + } + + def validateBucketSpec(): Option[String] = { + if ( + bucketSpec.nonEmpty && !options + .getOrElse("__hive_compatible_bucketed_table_insertion__", "false") + .equals("true") + ) { + Some("Unsupported native write: non-compatible hive bucket write is not supported.") } else { None } } - // Validate if all types are supported. def validateDataTypes(): Option[String] = { val unsupportedTypes = format match { case _: ParquetFileFormat => fields.flatMap { case StructField(_, _: YearMonthIntervalType, _, _) => Some("YearMonthIntervalType") - case StructField(_, _: StructType, _, _) => - Some("StructType") case _ => None } case _ => @@ -356,47 +375,8 @@ object VeloxBackendSettings extends BackendSettingsApi { } } - def validateFileFormat(): Option[String] = { - format match { - case _: ParquetFileFormat => None // Parquet is directly supported - case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter => - validateHiveFileFormat(h) // Parquet via Hive SerDe - case _ => - Some( - "Only ParquetFileFormat and HiveFileFormat are supported." - ) // Unsupported format - } - } - - def validateWriteFilesOptions(): Option[String] = { - val maxRecordsPerFile = options - .get("maxRecordsPerFile") - .map(_.toLong) - .getOrElse(SQLConf.get.maxRecordsPerFile) - if (maxRecordsPerFile > 0) { - Some("Unsupported native write: maxRecordsPerFile not supported.") - } else { - None - } - } - - def validateBucketSpec(): Option[String] = { - val isHiveCompatibleBucketTable = bucketSpec.nonEmpty && options - .getOrElse("__hive_compatible_bucketed_table_insertion__", "false") - .equals("true") - // Currently, the velox backend only supports bucketed tables compatible with Hive and - // is limited to partitioned tables. Therefore, we should add this condition restriction. - // After velox supports bucketed non-partitioned tables, we can remove the restriction on - // partitioned tables. - if (bucketSpec.isEmpty || isHiveCompatibleBucketTable) { - None - } else { - Some("Unsupported native write: non-compatible hive bucket write is not supported.") - } - } - - validateCompressionCodec() - .orElse(validateFileFormat()) + validateFileFormat() + .orElse(validateCompressionCodec()) .orElse(validateFieldMetadata()) .orElse(validateDataTypes()) .orElse(validateWriteFilesOptions()) @@ -406,14 +386,6 @@ object VeloxBackendSettings extends BackendSettingsApi { } } - override def supportNativeWrite(fields: Array[StructField]): Boolean = { - def isNotSupported(dataType: DataType): Boolean = dataType match { - case _: StructType | _: ArrayType | _: MapType => true - case _ => false - } - !fields.exists(field => isNotSupported(field.dataType)) - } - override def supportExpandExec(): Boolean = true override def supportSortExec(): Boolean = true diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index 1b0d0647dd9d..f952bc5019af 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -28,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteUtils { + override protected val resourcePath: String = "/tpch-data-parquet" override protected val fileFormat: String = "parquet" @@ -120,6 +121,53 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU } } + test("test insert into with struct type") { + withTable("t", "src") { + spark.sql("CREATE TABLE src (info STRUCT) USING PARQUET") + checkNativeWrite("INSERT INTO src SELECT named_struct('name', 'alice', 'age', 30)") + spark.sql("CREATE TABLE t (info STRUCT) USING PARQUET") + checkNativeWrite("INSERT INTO t SELECT info FROM src") + checkAnswer(spark.table("t"), Row(Row("alice", 30))) + } + } + + test("test insert into with array type") { + withTable("t", "src") { + spark.sql("CREATE TABLE src (ids ARRAY) USING PARQUET") + // todo: support native write with constant array + checkNativeWrite("INSERT INTO src SELECT array(1, 2, 3)", expectNative = false) + spark.sql("CREATE TABLE t (ids ARRAY) USING PARQUET") + checkNativeWrite("INSERT INTO t SELECT ids FROM src") + checkAnswer(spark.table("t"), Row(Seq(1, 2, 3))) + } + } + + test("test insert into with map type") { + withTable("t", "src") { + spark.sql("CREATE TABLE src (kv MAP) USING PARQUET") + // todo: support native write with constant map + checkNativeWrite("INSERT INTO src SELECT map('a', 1, 'b', 2)", expectNative = false) + spark.sql("CREATE TABLE t (kv MAP) USING PARQUET") + checkNativeWrite("INSERT INTO t SELECT kv FROM src") + checkAnswer(spark.table("t"), Row(Map("a" -> 1, "b" -> 2))) + } + } + + test("test insert into with nested struct type") { + withTable("t", "src") { + spark.sql( + """CREATE TABLE src (info STRUCT>) + |USING PARQUET""".stripMargin) + checkNativeWrite( + "INSERT INTO src SELECT named_struct('name', 'alice', " + + "'addr', named_struct('city', 'hangzhou', 'zip', 310000))") + spark.sql("""CREATE TABLE t (info STRUCT>) + |USING PARQUET""".stripMargin) + checkNativeWrite("INSERT INTO t SELECT info FROM src") + checkAnswer(spark.table("t"), Row(Row("alice", Row("hangzhou", 310000)))) + } + } + test("test ctas") { withTable("velox_ctas") { spark diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 8dd3156099e9..79274b4c6015 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -59,8 +59,6 @@ trait BackendSettingsApi { isPartitionedTable: Boolean, options: Map[String, String]): ValidationResult = ValidationResult.succeeded - def supportNativeWrite(fields: Array[StructField]): Boolean = true - def supportNativeMetadataColumns(): Boolean = true def supportNativeRowIndexColumn(): Boolean = true diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 606fa377b8a7..346d6b7f4dce 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -101,10 +101,7 @@ object GlutenWriterColumnarRules { case rc @ DataWritingCommandExec(cmd, child) => // The same thread can set these properties in the last query submission. val format = - if ( - BackendsApiManager.getSettings.supportNativeWrite(child.schema.fields) && - BackendsApiManager.getSettings.enableNativeWriteFiles() - ) { + if (BackendsApiManager.getSettings.enableNativeWriteFiles()) { getNativeFormat(cmd) } else { None From ac1b295786629107a21c9b08eacf80b45fdc8dc0 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Fri, 20 Mar 2026 01:08:39 +0800 Subject: [PATCH 2/3] fix test --- .../backendsapi/velox/VeloxBackend.scala | 33 ++++++++----------- .../execution/VeloxParquetWriteSuite.scala | 15 ++++++--- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 79e7a10071ac..7fb04d28ae5f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -342,27 +342,20 @@ object VeloxBackendSettings extends BackendSettingsApi { } def validateDataTypes(): Option[String] = { - val unsupportedTypes = format match { - case _: ParquetFileFormat => - fields.flatMap { - case StructField(_, _: YearMonthIntervalType, _, _) => - Some("YearMonthIntervalType") - case _ => None - } - case _ => - fields.flatMap { - field => - field.dataType match { - case _: StructType => Some("StructType") - case _: ArrayType => Some("ArrayType") - case _: MapType => Some("MapType") - case _: YearMonthIntervalType => Some("YearMonthIntervalType") - case _ => None - } - } + def hasUnsupportedType(dt: DataType): Boolean = dt match { + case _: YearMonthIntervalType => true + case st: StructType => st.fields.exists(f => hasUnsupportedType(f.dataType)) + case at: ArrayType => hasUnsupportedType(at.elementType) + case mt: MapType => hasUnsupportedType(mt.keyType) || hasUnsupportedType(mt.valueType) + case _ => false } - if (unsupportedTypes.nonEmpty) { - Some(unsupportedTypes.mkString("Found unsupported type:", ",", "")) + + val unsupported = fields.filter(f => hasUnsupportedType(f.dataType)) + if (unsupported.nonEmpty) { + Some( + unsupported + .map(_.dataType.simpleString) + .mkString("Found unsupported type:", ",", "")) } else { None } diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index f952bc5019af..03f6ecc056c1 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -63,9 +63,10 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU withTempPath { f => val path = f.getCanonicalPath + // TODO: maybe remove constant complex type restriction in WriteFilesExecTransformer (Spark 3.4+) checkNativeWrite( s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET SELECT array(struct(1), null) as var1", - expectNative = false) + expectNative = !isSparkVersionGE("3.4")) } } @@ -134,8 +135,10 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU test("test insert into with array type") { withTable("t", "src") { spark.sql("CREATE TABLE src (ids ARRAY) USING PARQUET") - // todo: support native write with constant array - checkNativeWrite("INSERT INTO src SELECT array(1, 2, 3)", expectNative = false) + // TODO: maybe remove constant complex type restriction in WriteFilesExecTransformer (Spark 3.4+) + checkNativeWrite( + "INSERT INTO src SELECT array(1, 2, 3)", + expectNative = !isSparkVersionGE("3.4")) spark.sql("CREATE TABLE t (ids ARRAY) USING PARQUET") checkNativeWrite("INSERT INTO t SELECT ids FROM src") checkAnswer(spark.table("t"), Row(Seq(1, 2, 3))) @@ -145,8 +148,10 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU test("test insert into with map type") { withTable("t", "src") { spark.sql("CREATE TABLE src (kv MAP) USING PARQUET") - // todo: support native write with constant map - checkNativeWrite("INSERT INTO src SELECT map('a', 1, 'b', 2)", expectNative = false) + // TODO: maybe remove constant complex type restriction in WriteFilesExecTransformer (Spark 3.4+) + checkNativeWrite( + "INSERT INTO src SELECT map('a', 1, 'b', 2)", + expectNative = !isSparkVersionGE("3.4")) spark.sql("CREATE TABLE t (kv MAP) USING PARQUET") checkNativeWrite("INSERT INTO t SELECT kv FROM src") checkAnswer(spark.table("t"), Row(Map("a" -> 1, "b" -> 2))) From 3d5d937c3e2e26e89defeeecfd619376fa2b728a Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Fri, 20 Mar 2026 01:27:50 +0800 Subject: [PATCH 3/3] format --- .../apache/spark/sql/delta/GlutenParquetFileFormat.scala | 4 ---- .../apache/spark/sql/execution/VeloxParquetWriteSuite.scala | 6 +++--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala index c4ffb47d7199..038afa38982b 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala @@ -30,16 +30,12 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil -import org.slf4j.LoggerFactory - class GlutenParquetFileFormat extends ParquetFileFormat with DataSourceRegister with Logging with Serializable { - private val logger = LoggerFactory.getLogger(classOf[GlutenParquetFileFormat]) - override def shortName(): String = "gluten-parquet" override def toString: String = "GlutenParquet" diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index 03f6ecc056c1..a4577c8a5b22 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -63,7 +63,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU withTempPath { f => val path = f.getCanonicalPath - // TODO: maybe remove constant complex type restriction in WriteFilesExecTransformer (Spark 3.4+) + // TODO: maybe remove constant complex type restriction (Spark 3.4+) checkNativeWrite( s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET SELECT array(struct(1), null) as var1", expectNative = !isSparkVersionGE("3.4")) @@ -135,7 +135,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU test("test insert into with array type") { withTable("t", "src") { spark.sql("CREATE TABLE src (ids ARRAY) USING PARQUET") - // TODO: maybe remove constant complex type restriction in WriteFilesExecTransformer (Spark 3.4+) + // TODO: maybe remove constant complex type restriction (Spark 3.4+) checkNativeWrite( "INSERT INTO src SELECT array(1, 2, 3)", expectNative = !isSparkVersionGE("3.4")) @@ -148,7 +148,7 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU test("test insert into with map type") { withTable("t", "src") { spark.sql("CREATE TABLE src (kv MAP) USING PARQUET") - // TODO: maybe remove constant complex type restriction in WriteFilesExecTransformer (Spark 3.4+) + // TODO: maybe remove constant complex type restriction (Spark 3.4+) checkNativeWrite( "INSERT INTO src SELECT map('a', 1, 'b', 2)", expectNative = !isSparkVersionGE("3.4"))