Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.datasource.GlutenFormatFactory

import org.apache.spark.internal.Logging
Expand All @@ -31,16 +30,11 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
import org.slf4j.LoggerFactory

class GlutenParquetFileFormat
extends ParquetFileFormat
with DataSourceRegister
with Logging
with Serializable {
import GlutenParquetFileFormat._

private val logger = LoggerFactory.getLogger(classOf[GlutenParquetFileFormat])

override def shortName(): String = "gluten-parquet"

Expand All @@ -62,39 +56,27 @@ class GlutenParquetFileFormat
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
if (isNativeWritable(dataSchema)) {
// Pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
val nativeConf =
GlutenFormatFactory("parquet")
.nativeConf(options, parquetOptions.compressionCodecClassName)
// Pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
val nativeConf =
GlutenFormatFactory("parquet")
.nativeConf(options, parquetOptions.compressionCodecClassName)

return new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
CodecConfig.from(context).getCodec.getExtension + ".parquet"
}
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
CodecConfig.from(context).getCodec.getExtension + ".parquet"
}

override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
GlutenFormatFactory("parquet")
.createOutputWriter(path, dataSchema, context, nativeConf)
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
GlutenFormatFactory("parquet")
.createOutputWriter(path, dataSchema, context, nativeConf)

}
}
}
logger.warn(
s"Data schema is unsupported by Gluten Parquet writer: $dataSchema, " +
s"falling back to the vanilla Spark Parquet writer")
super.prepareWrite(sparkSession, job, options, dataSchema)
}
}

object GlutenParquetFileFormat {
def isNativeWritable(schema: StructType): Boolean = {
BackendsApiManager.getSettings.supportNativeWrite(schema.fields)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat}
import org.apache.spark.sql.delta.DeltaOptions
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -126,7 +126,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
val dataSchema = dataColumns.toStructType
DataSourceUtils.verifySchema(fileFormat, dataSchema)
DataSourceUtils.checkFieldNames(fileFormat, dataSchema)
val isNativeWritable = GlutenParquetFileFormat.isNativeWritable(dataSchema)
val isNativeWritable = true

val outputDataColumns =
if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,89 +285,38 @@ object VeloxBackendSettings extends BackendSettingsApi {
isPartitionedTable: Boolean,
options: Map[String, String]): ValidationResult = {

// Validate if HiveFileFormat write is supported based on output file type
def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] = {
// Reflect to get access to fileSinkConf which contains the output file format
val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf")
fileSinkConfField.setAccessible(true)
val fileSinkConf = fileSinkConfField.get(hiveFileFormat)
val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo")
tableInfoField.setAccessible(true)
val tableInfo = tableInfoField.get(fileSinkConf)
val getOutputFileFormatClassNameMethod = tableInfo.getClass
.getDeclaredMethod("getOutputFileFormatClassName")
val outputFileFormatClassName = getOutputFileFormatClassNameMethod.invoke(tableInfo)

// Match based on the output file format class name
outputFileFormatClassName match {
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" =>
None
case _ =>
Some(
"HiveFileFormat is supported only with Parquet as the output file type"
) // Unsupported format
def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None
case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter =>
// Validate HiveFileFormat is backed by Parquet
val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf")
fileSinkConfField.setAccessible(true)
val fileSinkConf = fileSinkConfField.get(h)
val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo")
tableInfoField.setAccessible(true)
val tableInfo = tableInfoField.get(fileSinkConf)
val outputFormat = tableInfo.getClass
.getDeclaredMethod("getOutputFileFormatClassName")
.invoke(tableInfo)
outputFormat match {
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => None
case _ => Some("HiveFileFormat is supported only with Parquet as the output file type")
}
case _ => Some("Only ParquetFileFormat and HiveFileFormat are supported.")
}
}

def validateCompressionCodec(): Option[String] = {
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options)
if (unSupportedCompressions.contains(compressionCodec)) {
Some(s"$compressionCodec compression codec is unsupported in Velox backend.")
val unsupported = Set("brotli", "lzo", "lz4raw", "lz4_raw")
val codec = WriteFilesExecTransformer.getCompressionCodec(options)
if (unsupported.contains(codec)) {
Some(s"$codec compression codec is unsupported in Velox backend.")
} else {
None
}
}

// Validate if all types are supported.
def validateDataTypes(): Option[String] = {
val unsupportedTypes = format match {
case _: ParquetFileFormat =>
fields.flatMap {
case StructField(_, _: YearMonthIntervalType, _, _) =>
Some("YearMonthIntervalType")
case StructField(_, _: StructType, _, _) =>
Some("StructType")
case _ => None
}
case _ =>
fields.flatMap {
field =>
field.dataType match {
case _: StructType => Some("StructType")
case _: ArrayType => Some("ArrayType")
case _: MapType => Some("MapType")
case _: YearMonthIntervalType => Some("YearMonthIntervalType")
case _ => None
}
}
}
if (unsupportedTypes.nonEmpty) {
Some(unsupportedTypes.mkString("Found unsupported type:", ",", ""))
} else {
None
}
}

def validateFieldMetadata(): Option[String] = {
fields.find(_.metadata != Metadata.empty).map {
filed =>
s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}"
}
}

def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None // Parquet is directly supported
case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter =>
validateHiveFileFormat(h) // Parquet via Hive SerDe
case _ =>
Some(
"Only ParquetFileFormat and HiveFileFormat are supported."
) // Unsupported format
}
}

def validateWriteFilesOptions(): Option[String] = {
val maxRecordsPerFile = options
.get("maxRecordsPerFile")
Expand All @@ -381,22 +330,46 @@ object VeloxBackendSettings extends BackendSettingsApi {
}

def validateBucketSpec(): Option[String] = {
val isHiveCompatibleBucketTable = bucketSpec.nonEmpty && options
.getOrElse("__hive_compatible_bucketed_table_insertion__", "false")
.equals("true")
// Currently, the velox backend only supports bucketed tables compatible with Hive and
// is limited to partitioned tables. Therefore, we should add this condition restriction.
// After velox supports bucketed non-partitioned tables, we can remove the restriction on
// partitioned tables.
if (bucketSpec.isEmpty || isHiveCompatibleBucketTable) {
if (
bucketSpec.nonEmpty && !options
.getOrElse("__hive_compatible_bucketed_table_insertion__", "false")
.equals("true")
) {
Some("Unsupported native write: non-compatible hive bucket write is not supported.")
} else {
None
}
}

def validateDataTypes(): Option[String] = {
def hasUnsupportedType(dt: DataType): Boolean = dt match {
case _: YearMonthIntervalType => true
case st: StructType => st.fields.exists(f => hasUnsupportedType(f.dataType))
case at: ArrayType => hasUnsupportedType(at.elementType)
case mt: MapType => hasUnsupportedType(mt.keyType) || hasUnsupportedType(mt.valueType)
case _ => false
}

val unsupported = fields.filter(f => hasUnsupportedType(f.dataType))
if (unsupported.nonEmpty) {
Some(
unsupported
.map(_.dataType.simpleString)
.mkString("Found unsupported type:", ",", ""))
} else {
Some("Unsupported native write: non-compatible hive bucket write is not supported.")
None
}
}

validateCompressionCodec()
.orElse(validateFileFormat())
def validateFieldMetadata(): Option[String] = {
fields.find(_.metadata != Metadata.empty).map {
filed =>
s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}"
}
}

validateFileFormat()
.orElse(validateCompressionCodec())
.orElse(validateFieldMetadata())
.orElse(validateDataTypes())
.orElse(validateWriteFilesOptions())
Expand All @@ -406,14 +379,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
}

override def supportNativeWrite(fields: Array[StructField]): Boolean = {
def isNotSupported(dataType: DataType): Boolean = dataType match {
case _: StructType | _: ArrayType | _: MapType => true
case _ => false
}
!fields.exists(field => isNotSupported(field.dataType))
}

override def supportExpandExec(): Boolean = true

override def supportSortExec(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile

class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteUtils {

override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"

Expand Down Expand Up @@ -62,9 +63,10 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU
withTempPath {
f =>
val path = f.getCanonicalPath
// TODO: maybe remove constant complex type restriction (Spark 3.4+)
checkNativeWrite(
s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET SELECT array(struct(1), null) as var1",
expectNative = false)
expectNative = !isSparkVersionGE("3.4"))
}
}

Expand Down Expand Up @@ -120,6 +122,57 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU
}
}

test("test insert into with struct type") {
withTable("t", "src") {
spark.sql("CREATE TABLE src (info STRUCT<name: STRING, age: INT>) USING PARQUET")
checkNativeWrite("INSERT INTO src SELECT named_struct('name', 'alice', 'age', 30)")
spark.sql("CREATE TABLE t (info STRUCT<name: STRING, age: INT>) USING PARQUET")
checkNativeWrite("INSERT INTO t SELECT info FROM src")
checkAnswer(spark.table("t"), Row(Row("alice", 30)))
}
}

test("test insert into with array type") {
withTable("t", "src") {
spark.sql("CREATE TABLE src (ids ARRAY<INT>) USING PARQUET")
// TODO: maybe remove constant complex type restriction (Spark 3.4+)
checkNativeWrite(
"INSERT INTO src SELECT array(1, 2, 3)",
expectNative = !isSparkVersionGE("3.4"))
spark.sql("CREATE TABLE t (ids ARRAY<INT>) USING PARQUET")
checkNativeWrite("INSERT INTO t SELECT ids FROM src")
checkAnswer(spark.table("t"), Row(Seq(1, 2, 3)))
}
}

test("test insert into with map type") {
withTable("t", "src") {
spark.sql("CREATE TABLE src (kv MAP<STRING, INT>) USING PARQUET")
// TODO: maybe remove constant complex type restriction (Spark 3.4+)
checkNativeWrite(
"INSERT INTO src SELECT map('a', 1, 'b', 2)",
expectNative = !isSparkVersionGE("3.4"))
spark.sql("CREATE TABLE t (kv MAP<STRING, INT>) USING PARQUET")
checkNativeWrite("INSERT INTO t SELECT kv FROM src")
checkAnswer(spark.table("t"), Row(Map("a" -> 1, "b" -> 2)))
}
}

test("test insert into with nested struct type") {
withTable("t", "src") {
spark.sql(
"""CREATE TABLE src (info STRUCT<name: STRING, addr: STRUCT<city: STRING, zip: INT>>)
|USING PARQUET""".stripMargin)
checkNativeWrite(
"INSERT INTO src SELECT named_struct('name', 'alice', " +
"'addr', named_struct('city', 'hangzhou', 'zip', 310000))")
spark.sql("""CREATE TABLE t (info STRUCT<name: STRING, addr: STRUCT<city: STRING, zip: INT>>)
|USING PARQUET""".stripMargin)
checkNativeWrite("INSERT INTO t SELECT info FROM src")
checkAnswer(spark.table("t"), Row(Row("alice", Row("hangzhou", 310000))))
}
}

test("test ctas") {
withTable("velox_ctas") {
spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ trait BackendSettingsApi {
isPartitionedTable: Boolean,
options: Map[String, String]): ValidationResult = ValidationResult.succeeded

def supportNativeWrite(fields: Array[StructField]): Boolean = true

def supportNativeMetadataColumns(): Boolean = true

def supportNativeRowIndexColumn(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ object GlutenWriterColumnarRules {
case rc @ DataWritingCommandExec(cmd, child) =>
// The same thread can set these properties in the last query submission.
val format =
if (
BackendsApiManager.getSettings.supportNativeWrite(child.schema.fields) &&
BackendsApiManager.getSettings.enableNativeWriteFiles()
) {
if (BackendsApiManager.getSettings.enableNativeWriteFiles()) {
getNativeFormat(cmd)
} else {
Comment on lines 101 to 106
None
Expand Down
Loading