From ef34493183330082505be87e58c78f9e995a418a Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sat, 10 Jan 2026 09:34:53 -0800 Subject: [PATCH 1/6] int_to_binary --- .../source/user-guide/latest/compatibility.md | 11 ++--- .../spark-expr/src/conversion_funcs/cast.rs | 41 +++++++++++++++---- .../apache/comet/expressions/CometCast.scala | 9 +++- .../org/apache/comet/CometCastSuite.scala | 2 +- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index 64bd9d2bcd..f2002351f0 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -73,16 +73,17 @@ should not be used in production. The feature will be enabled in a future releas Cast operations in Comet fall into three levels of support: -- **C (Compatible)**: The results match Apache Spark -- **I (Incompatible)**: The results may match Apache Spark for some inputs, but there are known issues where some inputs +- **Compatible**: The results match Apache Spark +- **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting `spark.comet.expression.Cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not recommended for production use. -- **U (Unsupported)**: Comet does not provide a native version of this cast expression and the query stage will fall back to +- **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to Spark. -- **N/A**: Spark does not support this cast. -### Legacy Mode +### Compatible Casts + +The following cast operations are generally compatible with Spark except for the differences noted here. diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 9ccfc3e6af..68cd5d7f6b 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -20,13 +20,13 @@ use crate::{timezone, BinaryOutputStyle}; use crate::{EvalMode, SparkError, SparkResult}; use arrow::array::builder::StringBuilder; use arrow::array::{ - BooleanBuilder, Decimal128Builder, DictionaryArray, GenericByteArray, ListArray, + BinaryBuilder, BooleanBuilder, Decimal128Builder, DictionaryArray, GenericByteArray, ListArray, PrimitiveBuilder, StringArray, StructArray, }; use arrow::compute::can_cast_types; use arrow::datatypes::{ - i256, ArrowDictionaryKeyType, ArrowNativeType, DataType, Decimal256Type, GenericBinaryType, - Schema, + i256, ArrowDictionaryKeyType, ArrowNativeType, DataType, Decimal256Type, + GenericBinaryType, Schema, }; use arrow::{ array::{ @@ -311,7 +311,7 @@ fn can_cast_from_byte(to_type: &DataType, _: &SparkCastOptions) -> bool { use DataType::*; matches!( to_type, - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Decimal128(_, _) + Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Decimal128(_, _) | Binary ) } @@ -319,14 +319,14 @@ fn can_cast_from_short(to_type: &DataType, _: &SparkCastOptions) -> bool { use DataType::*; matches!( to_type, - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Decimal128(_, _) + Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Decimal128(_, _) | Binary ) } fn can_cast_from_int(to_type: &DataType, options: &SparkCastOptions) -> bool { use DataType::*; match to_type { - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Utf8 => true, + Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Utf8 | Binary => true, Decimal128(_, _) => { // incompatible: no overflow check options.allow_incompat @@ -338,7 +338,7 @@ fn can_cast_from_int(to_type: &DataType, options: &SparkCastOptions) -> bool { fn can_cast_from_long(to_type: &DataType, options: &SparkCastOptions) -> bool { use DataType::*; match to_type { - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => true, + Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Binary => true, Decimal128(_, _) => { // incompatible: no overflow check options.allow_incompat @@ -501,6 +501,29 @@ macro_rules! cast_float_to_string { }}; } +// eval mode is not needed since all ints can be implemented in binary format +macro_rules! cast_whole_num_to_binary { + ($array:expr, $primitive_type:ty, $byte_size:expr) => {{ + let input_arr = $array + .as_any() + .downcast_ref::<$primitive_type>() + .ok_or_else(|| SparkError::Internal("Expected numeric array".to_string()))?; + + let len = input_arr.len(); + let mut builder = BinaryBuilder::with_capacity(len, len * $byte_size); + + for i in 0..input_arr.len() { + if input_arr.is_null(i) { + builder.append_null(); + } else { + builder.append_value(input_arr.value(i).to_be_bytes()); + } + } + + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} + macro_rules! cast_int_to_int_macro { ( $array: expr, @@ -1100,6 +1123,10 @@ fn cast_array( Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?) } (Binary, Utf8) => Ok(cast_binary_to_string::(&array, cast_options)?), + (Int8, Binary) => cast_whole_num_to_binary!(&array, Int8Array, 1), + (Int16, Binary) => cast_whole_num_to_binary!(&array, Int16Array, 2), + (Int32, Binary) => cast_whole_num_to_binary!(&array, Int32Array, 4), + (Int64, Binary) => cast_whole_num_to_binary!(&array, Int64Array, 8), _ if cast_options.is_adapting_schema || is_datafusion_spark_compatible(from_type, to_type) => { diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 9fc4b3afdf..7ce986fb54 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -21,7 +21,7 @@ package org.apache.comet.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, Literal} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, DataTypes, DecimalType, NullType, StructType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -126,6 +126,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { isSupported(dt.elementType, DataTypes.StringType, timeZoneId, evalMode) case (dt: ArrayType, dt1: ArrayType) => isSupported(dt.elementType, dt1.elementType, timeZoneId, evalMode) + case (from: DataType, _: BinaryType) => canCastToBinary(from) case (dt: DataType, _) if dt.typeName == "timestamp_ntz" => // https://github.com/apache/datafusion-comet/issues/378 toType match { @@ -344,6 +345,12 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _ => Unsupported(Some(s"Cast from DecimalType to $toType is not supported")) } + private def canCastToBinary(fromType: DataType): SupportLevel = fromType match { + case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => + Compatible() + case _ => Unsupported(Some(s"Cast from BinaryType to $fromType is not supported")) + } + private def unsupported(fromType: DataType, toType: DataType): Unsupported = { Unsupported(Some(s"Cast from $fromType to $toType is not supported")) } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8a68df3820..c0bcd01349 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -281,7 +281,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { hasIncompatibleType = usingParquetExecWithIncompatTypes) } - ignore("cast ShortType to BinaryType") { + test("cast ShortType to BinaryType") { castTest( generateShorts(), DataTypes.BinaryType, From 4e6b6bbb3ac7ef2c4eaa77c5d04b78afd378b6b4 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 13 Jan 2026 15:37:07 -0800 Subject: [PATCH 2/6] int_to_binary --- .../spark-expr/src/conversion_funcs/cast.rs | 9 +- .../apache/comet/expressions/CometCast.scala | 129 +++++++++++------- .../org/apache/comet/CometCastSuite.scala | 55 ++++---- 3 files changed, 116 insertions(+), 77 deletions(-) diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 68cd5d7f6b..0ffddcbeb6 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -66,6 +66,7 @@ use std::{ num::Wrapping, sync::Arc, }; +use crate::EvalMode::Legacy; static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f"); @@ -1123,10 +1124,10 @@ fn cast_array( Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?) } (Binary, Utf8) => Ok(cast_binary_to_string::(&array, cast_options)?), - (Int8, Binary) => cast_whole_num_to_binary!(&array, Int8Array, 1), - (Int16, Binary) => cast_whole_num_to_binary!(&array, Int16Array, 2), - (Int32, Binary) => cast_whole_num_to_binary!(&array, Int32Array, 4), - (Int64, Binary) => cast_whole_num_to_binary!(&array, Int64Array, 8), + (Int8, Binary) if (eval_mode == Legacy) => cast_whole_num_to_binary!(&array, Int8Array, 1), + (Int16, Binary) if (eval_mode == Legacy) => cast_whole_num_to_binary!(&array, Int16Array, 2), + (Int32, Binary) if (eval_mode == Legacy) => cast_whole_num_to_binary!(&array, Int32Array, 4), + (Int64, Binary) if (eval_mode == Legacy) => cast_whole_num_to_binary!(&array, Int64Array, 8), _ if cast_options.is_adapting_schema || is_datafusion_spark_compatible(from_type, to_type) => { diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 7ce986fb54..565a967803 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -126,7 +126,6 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { isSupported(dt.elementType, DataTypes.StringType, timeZoneId, evalMode) case (dt: ArrayType, dt1: ArrayType) => isSupported(dt.elementType, dt1.elementType, timeZoneId, evalMode) - case (from: DataType, _: BinaryType) => canCastToBinary(from) case (dt: DataType, _) if dt.typeName == "timestamp_ntz" => // https://github.com/apache/datafusion-comet/issues/378 toType match { @@ -148,13 +147,13 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case (DataTypes.BooleanType, _) => canCastFromBoolean(toType) case (DataTypes.ByteType, _) => - canCastFromByte(toType) + canCastFromByte(toType, evalMode) case (DataTypes.ShortType, _) => - canCastFromShort(toType) + canCastFromShort(toType, evalMode) case (DataTypes.IntegerType, _) => - canCastFromInt(toType) + canCastFromInt(toType, evalMode) case (DataTypes.LongType, _) => - canCastFromLong(toType) + canCastFromLong(toType, evalMode) case (DataTypes.FloatType, _) => canCastFromFloat(toType) case (DataTypes.DoubleType, _) => @@ -269,53 +268,85 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _ => unsupported(DataTypes.BooleanType, toType) } - private def canCastFromByte(toType: DataType): SupportLevel = toType match { - case DataTypes.BooleanType => - Compatible() - case DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => - Compatible() - case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => - Compatible() - case _ => - unsupported(DataTypes.ByteType, toType) - } + private def canCastFromByte(toType: DataType, evalMode: CometEvalMode.Value): SupportLevel = + toType match { + case DataTypes.BooleanType => + Compatible() + case DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => + Compatible() + case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => + Compatible() + case DataTypes.BinaryType => + if (evalMode == CometEvalMode.LEGACY) { + Compatible() + } else { + Unsupported( + Some(s"Spark does not support byte to binary conversion in ${evalMode} eval mode")) + } + case _ => + unsupported(DataTypes.ByteType, toType) + } - private def canCastFromShort(toType: DataType): SupportLevel = toType match { - case DataTypes.BooleanType => - Compatible() - case DataTypes.ByteType | DataTypes.IntegerType | DataTypes.LongType => - Compatible() - case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => - Compatible() - case _ => - unsupported(DataTypes.ShortType, toType) - } + private def canCastFromShort(toType: DataType, evalMode: CometEvalMode.Value): SupportLevel = + toType match { + case DataTypes.BooleanType => + Compatible() + case DataTypes.ByteType | DataTypes.IntegerType | DataTypes.LongType => + Compatible() + case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => + Compatible() + case DataTypes.BinaryType => + if (evalMode == CometEvalMode.LEGACY) { + Compatible() + } else { + Unsupported( + Some(s"Spark does not support short to binary conversion in ${evalMode} eval mode")) + } + case _ => + unsupported(DataTypes.ShortType, toType) + } - private def canCastFromInt(toType: DataType): SupportLevel = toType match { - case DataTypes.BooleanType => - Compatible() - case DataTypes.ByteType | DataTypes.ShortType | DataTypes.LongType => - Compatible() - case DataTypes.FloatType | DataTypes.DoubleType => - Compatible() - case _: DecimalType => - Compatible() - case _ => - unsupported(DataTypes.IntegerType, toType) - } + private def canCastFromInt(toType: DataType, evalMode: CometEvalMode.Value): SupportLevel = + toType match { + case DataTypes.BooleanType => + Compatible() + case DataTypes.ByteType | DataTypes.ShortType | DataTypes.LongType => + Compatible() + case DataTypes.FloatType | DataTypes.DoubleType => + Compatible() + case _: DecimalType => + Compatible() + case DataTypes.BinaryType => + if (evalMode == CometEvalMode.LEGACY) { + Compatible() + } else { + Unsupported( + Some(s"Spark does not support int to binary conversion in ${evalMode} eval mode")) + } + case _ => + unsupported(DataTypes.IntegerType, toType) + } - private def canCastFromLong(toType: DataType): SupportLevel = toType match { - case DataTypes.BooleanType => - Compatible() - case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType => - Compatible() - case DataTypes.FloatType | DataTypes.DoubleType => - Compatible() - case _: DecimalType => - Compatible() - case _ => - unsupported(DataTypes.LongType, toType) - } + private def canCastFromLong(toType: DataType, evalMode: CometEvalMode.Value): SupportLevel = + toType match { + case DataTypes.BooleanType => + Compatible() + case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType => + Compatible() + case DataTypes.FloatType | DataTypes.DoubleType => + Compatible() + case _: DecimalType => + Compatible() + case DataTypes.BinaryType => + if (evalMode == CometEvalMode.LEGACY) { + Compatible() + } else { + Unsupported( + Some(s"Spark does not support long to binary conversion in ${evalMode} eval mode")) + } + case _ => + unsupported(DataTypes.LongType, toType) + } private def canCastFromFloat(toType: DataType): SupportLevel = toType match { case DataTypes.BooleanType | DataTypes.DoubleType | DataTypes.ByteType | DataTypes.ShortType | diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index c0bcd01349..04c685f1da 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -207,11 +207,12 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { hasIncompatibleType = usingParquetExecWithIncompatTypes) } - ignore("cast ByteType to BinaryType") { + test("cast ByteType to BinaryType") { + // Spark does not support ANSI or Try mode castTest( generateBytes(), DataTypes.BinaryType, - hasIncompatibleType = usingParquetExecWithIncompatTypes) + hasIncompatibleType = usingParquetExecWithIncompatTypes, testAnsi = false, testTry = false) } ignore("cast ByteType to TimestampType") { @@ -282,10 +283,11 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast ShortType to BinaryType") { +// Spark does not support ANSI or Try mode castTest( generateShorts(), DataTypes.BinaryType, - hasIncompatibleType = usingParquetExecWithIncompatTypes) + hasIncompatibleType = usingParquetExecWithIncompatTypes, testAnsi = false, testTry = false) } ignore("cast ShortType to TimestampType") { @@ -346,8 +348,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateInts(), DataTypes.StringType) } - ignore("cast IntegerType to BinaryType") { - castTest(generateInts(), DataTypes.BinaryType) + test("cast IntegerType to BinaryType") { + // Spark does not support ANSI or Try mode + castTest(generateInts(), DataTypes.BinaryType, testAnsi = false, testTry = false) } ignore("cast IntegerType to TimestampType") { @@ -392,8 +395,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateLongs(), DataTypes.StringType) } - ignore("cast LongType to BinaryType") { - castTest(generateLongs(), DataTypes.BinaryType) + test("cast LongType to BinaryType") { + // Spark does not support ANSI or Try mode + castTest(generateLongs(), DataTypes.BinaryType , testAnsi = false, testTry = false) } ignore("cast LongType to TimestampType") { @@ -1329,28 +1333,30 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { input: DataFrame, toType: DataType, hasIncompatibleType: Boolean = false, - testAnsi: Boolean = true): Unit = { + testAnsi: Boolean = true, + testTry: Boolean = true): Unit = { withTempPath { dir => val data = roundtripParquet(input, dir).coalesce(1) - data.createOrReplaceTempView("t") withSQLConf((SQLConf.ANSI_ENABLED.key, "false")) { // cast() should return null for invalid inputs when ansi mode is disabled - val df = spark.sql(s"select a, cast(a as ${toType.sql}) from t order by a") + val df = data.select(col("a"), col("a").cast(toType)).orderBy(col("a")) if (hasIncompatibleType) { checkSparkAnswer(df) } else { checkSparkAnswerAndOperator(df) } - // try_cast() should always return null for invalid inputs - val df2 = - spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a") - if (hasIncompatibleType) { - checkSparkAnswer(df2) - } else { - checkSparkAnswerAndOperator(df2) + if (testTry){ + // try_cast() should always return null for invalid inputs + val df2 = + data.select(col("a"), col("a").try_cast(toType)).orderBy(col("a")) + if (hasIncompatibleType) { + checkSparkAnswer(df2) + } else { + checkSparkAnswerAndOperator(df2) + } } } @@ -1408,14 +1414,15 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } // try_cast() should always return null for invalid inputs - val df2 = - spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a") - if (hasIncompatibleType) { - checkSparkAnswer(df2) - } else { - checkSparkAnswerAndOperator(df2) + if (testTry){ + val df2 = + data.select(col("a"), col("a").cast(toType)).orderBy(col("a")) + if (hasIncompatibleType) { + checkSparkAnswer(df2) + } else { + checkSparkAnswerAndOperator(df2) + } } - } } } From 3e93600d0b6c62bd3aa1d515d6a41b50ba6dddf7 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 13 Jan 2026 16:01:47 -0800 Subject: [PATCH 3/6] int_to_binary --- .../apache/comet/expressions/CometCast.scala | 34 ++++--------------- .../org/apache/comet/CometCastSuite.scala | 16 +++++---- 2 files changed, 16 insertions(+), 34 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 565a967803..28b760ac6c 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -276,13 +276,8 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { Compatible() case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() - case DataTypes.BinaryType => - if (evalMode == CometEvalMode.LEGACY) { - Compatible() - } else { - Unsupported( - Some(s"Spark does not support byte to binary conversion in ${evalMode} eval mode")) - } + case DataTypes.BinaryType if (evalMode == CometEvalMode.LEGACY) => + Compatible() case _ => unsupported(DataTypes.ByteType, toType) } @@ -295,13 +290,8 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { Compatible() case DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() - case DataTypes.BinaryType => - if (evalMode == CometEvalMode.LEGACY) { - Compatible() - } else { - Unsupported( - Some(s"Spark does not support short to binary conversion in ${evalMode} eval mode")) - } + case DataTypes.BinaryType if (evalMode == CometEvalMode.LEGACY) => + Compatible() case _ => unsupported(DataTypes.ShortType, toType) } @@ -316,13 +306,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { Compatible() case _: DecimalType => Compatible() - case DataTypes.BinaryType => - if (evalMode == CometEvalMode.LEGACY) { - Compatible() - } else { - Unsupported( - Some(s"Spark does not support int to binary conversion in ${evalMode} eval mode")) - } + case DataTypes.BinaryType if (evalMode == CometEvalMode.LEGACY) => Compatible() case _ => unsupported(DataTypes.IntegerType, toType) } @@ -337,13 +321,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { Compatible() case _: DecimalType => Compatible() - case DataTypes.BinaryType => - if (evalMode == CometEvalMode.LEGACY) { - Compatible() - } else { - Unsupported( - Some(s"Spark does not support long to binary conversion in ${evalMode} eval mode")) - } + case DataTypes.BinaryType if (evalMode == CometEvalMode.LEGACY) => Compatible() case _ => unsupported(DataTypes.LongType, toType) } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 04c685f1da..afb85e601a 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -212,7 +212,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest( generateBytes(), DataTypes.BinaryType, - hasIncompatibleType = usingParquetExecWithIncompatTypes, testAnsi = false, testTry = false) + hasIncompatibleType = usingParquetExecWithIncompatTypes, + testAnsi = false, + testTry = false) } ignore("cast ByteType to TimestampType") { @@ -287,7 +289,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest( generateShorts(), DataTypes.BinaryType, - hasIncompatibleType = usingParquetExecWithIncompatTypes, testAnsi = false, testTry = false) + hasIncompatibleType = usingParquetExecWithIncompatTypes, + testAnsi = false, + testTry = false) } ignore("cast ShortType to TimestampType") { @@ -397,7 +401,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast LongType to BinaryType") { // Spark does not support ANSI or Try mode - castTest(generateLongs(), DataTypes.BinaryType , testAnsi = false, testTry = false) + castTest(generateLongs(), DataTypes.BinaryType, testAnsi = false, testTry = false) } ignore("cast LongType to TimestampType") { @@ -1348,7 +1352,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkAnswerAndOperator(df) } - if (testTry){ + if (testTry) { // try_cast() should always return null for invalid inputs val df2 = data.select(col("a"), col("a").try_cast(toType)).orderBy(col("a")) @@ -1414,9 +1418,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } // try_cast() should always return null for invalid inputs - if (testTry){ + if (testTry) { val df2 = - data.select(col("a"), col("a").cast(toType)).orderBy(col("a")) + data.select(col("a"), col("a").try_cast(toType)).orderBy(col("a")) if (hasIncompatibleType) { checkSparkAnswer(df2) } else { From 3f7940ab35f4cedc0dc56fce56ed8b510b432b3a Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 13 Jan 2026 18:51:41 -0800 Subject: [PATCH 4/6] int_to_binary_boolean_to_decimal --- .../spark-expr/src/conversion_funcs/cast.rs | 36 +++++++++++++++---- .../apache/comet/expressions/CometCast.scala | 4 +-- .../org/apache/comet/CometCastSuite.scala | 18 +++++++--- 3 files changed, 45 insertions(+), 13 deletions(-) diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 0ffddcbeb6..bfbcb0edcf 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -16,6 +16,7 @@ // under the License. use crate::utils::array_with_timezone; +use crate::EvalMode::Legacy; use crate::{timezone, BinaryOutputStyle}; use crate::{EvalMode, SparkError, SparkResult}; use arrow::array::builder::StringBuilder; @@ -25,8 +26,8 @@ use arrow::array::{ }; use arrow::compute::can_cast_types; use arrow::datatypes::{ - i256, ArrowDictionaryKeyType, ArrowNativeType, DataType, Decimal256Type, - GenericBinaryType, Schema, + i256, ArrowDictionaryKeyType, ArrowNativeType, DataType, Decimal256Type, GenericBinaryType, + Schema, }; use arrow::{ array::{ @@ -66,7 +67,6 @@ use std::{ num::Wrapping, sync::Arc, }; -use crate::EvalMode::Legacy; static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f"); @@ -305,7 +305,10 @@ fn can_cast_from_timestamp(to_type: &DataType, _options: &SparkCastOptions) -> b fn can_cast_from_boolean(to_type: &DataType, _: &SparkCastOptions) -> bool { use DataType::*; - matches!(to_type, Int8 | Int16 | Int32 | Int64 | Float32 | Float64) + matches!( + to_type, + Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Decimal128(_, _) + ) } fn can_cast_from_byte(to_type: &DataType, _: &SparkCastOptions) -> bool { @@ -1125,9 +1128,18 @@ fn cast_array( } (Binary, Utf8) => Ok(cast_binary_to_string::(&array, cast_options)?), (Int8, Binary) if (eval_mode == Legacy) => cast_whole_num_to_binary!(&array, Int8Array, 1), - (Int16, Binary) if (eval_mode == Legacy) => cast_whole_num_to_binary!(&array, Int16Array, 2), - (Int32, Binary) if (eval_mode == Legacy) => cast_whole_num_to_binary!(&array, Int32Array, 4), - (Int64, Binary) if (eval_mode == Legacy) => cast_whole_num_to_binary!(&array, Int64Array, 8), + (Int16, Binary) if (eval_mode == Legacy) => { + cast_whole_num_to_binary!(&array, Int16Array, 2) + } + (Int32, Binary) if (eval_mode == Legacy) => { + cast_whole_num_to_binary!(&array, Int32Array, 4) + } + (Int64, Binary) if (eval_mode == Legacy) => { + cast_whole_num_to_binary!(&array, Int64Array, 8) + } + (Boolean, Decimal128(precision, scale)) => { + cast_boolean_to_decimal(&array, *precision, *scale) + } _ if cast_options.is_adapting_schema || is_datafusion_spark_compatible(from_type, to_type) => { @@ -1146,6 +1158,16 @@ fn cast_array( Ok(spark_cast_postprocess(cast_result?, from_type, to_type)) } +fn cast_boolean_to_decimal(array: &ArrayRef, precision: u8, scale: i8) -> SparkResult { + let bool_array = array.as_boolean(); + let scale_factor = 10_i128.pow(scale as u32); + let result: Decimal128Array = bool_array + .iter() + .map(|v| v.map(|b| if b { scale_factor } else { 0 })) + .collect(); + Ok(Arc::new(result.with_precision_and_scale(precision, scale)?)) +} + fn cast_string_to_float( array: &ArrayRef, to_type: &DataType, diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 28b760ac6c..4be9a48eaf 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -21,7 +21,7 @@ package org.apache.comet.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, Literal} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, DataTypes, DecimalType, NullType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -263,7 +263,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { private def canCastFromBoolean(toType: DataType): SupportLevel = toType match { case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | - DataTypes.FloatType | DataTypes.DoubleType => + DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType => Compatible() case _ => unsupported(DataTypes.BooleanType, toType) } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index afb85e601a..9127d85476 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -135,11 +135,18 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateBools(), DataTypes.DoubleType) } - ignore("cast BooleanType to DecimalType(10,2)") { - // Arrow error: Cast error: Casting from Boolean to Decimal128(10, 2) not supported + test("cast BooleanType to DecimalType(10,2)") { castTest(generateBools(), DataTypes.createDecimalType(10, 2)) } + test("cast BooleanType to DecimalType(14,4)") { + castTest(generateBools(), DataTypes.createDecimalType(14, 4)) + } + + test("cast BooleanType to DecimalType(30,0)") { + castTest(generateBools(), DataTypes.createDecimalType(30, 0)) + } + test("cast BooleanType to StringType") { castTest(generateBools(), DataTypes.StringType) } @@ -1353,9 +1360,11 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } if (testTry) { + data.createOrReplaceTempView("t") // try_cast() should always return null for invalid inputs +// not using spark DSL since it `try_cast` is only available from Spark 4x val df2 = - data.select(col("a"), col("a").try_cast(toType)).orderBy(col("a")) + spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a") if (hasIncompatibleType) { checkSparkAnswer(df2) } else { @@ -1419,8 +1428,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // try_cast() should always return null for invalid inputs if (testTry) { + data.createOrReplaceTempView("t") val df2 = - data.select(col("a"), col("a").try_cast(toType)).orderBy(col("a")) + spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a") if (hasIncompatibleType) { checkSparkAnswer(df2) } else { From fb8814f1fb98c878d260dd6b6ec31c27cd021841 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sat, 24 Jan 2026 23:57:52 -0800 Subject: [PATCH 5/6] fix_test_failures_review_comments --- native/spark-expr/src/conversion_funcs/cast.rs | 4 ++-- .../main/scala/org/apache/comet/expressions/CometCast.scala | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index bfbcb0edcf..086af2d312 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -1160,10 +1160,10 @@ fn cast_array( fn cast_boolean_to_decimal(array: &ArrayRef, precision: u8, scale: i8) -> SparkResult { let bool_array = array.as_boolean(); - let scale_factor = 10_i128.pow(scale as u32); + let scaled_value = 10_i128.pow(scale as u32); let result: Decimal128Array = bool_array .iter() - .map(|v| v.map(|b| if b { scale_factor } else { 0 })) + .map(|v| v.map(|b| if b { scaled_value } else { 0 })) .collect(); Ok(Arc::new(result.with_precision_and_scale(precision, scale)?)) } diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 4be9a48eaf..853a288eb7 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -354,12 +354,6 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case _ => Unsupported(Some(s"Cast from DecimalType to $toType is not supported")) } - private def canCastToBinary(fromType: DataType): SupportLevel = fromType match { - case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType => - Compatible() - case _ => Unsupported(Some(s"Cast from BinaryType to $fromType is not supported")) - } - private def unsupported(fromType: DataType, toType: DataType): Unsupported = { Unsupported(Some(s"Cast from $fromType to $toType is not supported")) } From d1f1512fa2e0c75abb72254f1cd751b1748a2081 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 25 Jan 2026 00:34:45 -0800 Subject: [PATCH 6/6] fix_test_failures_review_comments --- docs/source/user-guide/latest/compatibility.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index f2002351f0..64bd9d2bcd 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -73,17 +73,16 @@ should not be used in production. The feature will be enabled in a future releas Cast operations in Comet fall into three levels of support: -- **Compatible**: The results match Apache Spark -- **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs +- **C (Compatible)**: The results match Apache Spark +- **I (Incompatible)**: The results may match Apache Spark for some inputs, but there are known issues where some inputs will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting `spark.comet.expression.Cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not recommended for production use. -- **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to +- **U (Unsupported)**: Comet does not provide a native version of this cast expression and the query stage will fall back to Spark. +- **N/A**: Spark does not support this cast. -### Compatible Casts - -The following cast operations are generally compatible with Spark except for the differences noted here. +### Legacy Mode