diff --git a/.gitignore b/.gitignore index 9978e37bdf..02c5e2c4a0 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ spark/benchmarks .DS_Store comet-event-trace.json __pycache__ +CLAUDE.md diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index 0ca6f8ea97..14a830395b 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -58,6 +58,22 @@ Expressions that are not 100% Spark-compatible will fall back to Spark by defaul `spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is the Spark expression class name. See the [Comet Supported Expressions Guide](expressions.md) for more information on this configuration setting. +## Date and Time Functions + +Comet's native implementation of date and time functions may produce different results than Spark for dates +far in the future (approximately beyond year 2100). This is because Comet uses the chrono-tz library for +timezone calculations, which has limited support for Daylight Saving Time (DST) rules beyond the IANA +time zone database's explicit transitions. + +For dates within a reasonable range (approximately 1970-2100), Comet's date and time functions are compatible +with Spark. For dates beyond this range, functions that involve timezone-aware calculations (such as +`date_trunc` with timezone-aware timestamps) may produce results with incorrect DST offsets. + +If you need to process dates far in the future with accurate timezone handling, consider: + +- Using timezone-naive types (`timestamp_ntz`) when timezone conversion is not required +- Falling back to Spark for these specific operations + ## Regular Expressions Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's @@ -88,20 +104,21 @@ Cast operations in Comet fall into three levels of support: -| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | -|---|---|---|---|---|---|---|---|---|---|---|---|---| -| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | -| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | -| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | -| date | N/A | U | U | - | U | U | U | U | U | U | C | U | -| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | -| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | -| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | -| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | -| long | U | C | C | N/A | C | C | C | C | - | C | C | U | -| short | U | C | C | N/A | C | C | C | C | C | - | C | U | -| string | C | C | C | C | I | C | C | C | C | C | - | I | -| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | +| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | timestamp_ntz | +|---|---|---|---|---|---|---|---|---|---|---|---|---|---| +| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | N/A | +| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | N/A | +| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | N/A | +| date | N/A | U | U | - | U | U | U | U | U | U | C | U | U | +| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | N/A | +| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | N/A | +| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | N/A | +| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | N/A | +| long | U | C | C | N/A | C | C | C | C | - | C | C | U | N/A | +| short | U | C | C | N/A | C | C | C | C | C | - | C | U | N/A | +| string | C | C | C | C | I | C | C | C | C | C | - | I | U | +| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | U | +| timestamp_ntz | N/A | N/A | N/A | C | N/A | N/A | N/A | N/A | N/A | N/A | C | C | - | **Notes:** @@ -123,20 +140,21 @@ Cast operations in Comet fall into three levels of support: -| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | -|---|---|---|---|---|---|---|---|---|---|---|---|---| -| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | -| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | -| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | -| date | N/A | U | U | - | U | U | U | U | U | U | C | U | -| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | -| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | -| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | -| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | -| long | U | C | C | N/A | C | C | C | C | - | C | C | U | -| short | U | C | C | N/A | C | C | C | C | C | - | C | U | -| string | C | C | C | C | I | C | C | C | C | C | - | I | -| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | +| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | timestamp_ntz | +|---|---|---|---|---|---|---|---|---|---|---|---|---|---| +| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | N/A | +| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | N/A | +| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | N/A | +| date | N/A | U | U | - | U | U | U | U | U | U | C | U | U | +| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | N/A | +| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | N/A | +| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | N/A | +| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | N/A | +| long | U | C | C | N/A | C | C | C | C | - | C | C | U | N/A | +| short | U | C | C | N/A | C | C | C | C | C | - | C | U | N/A | +| string | C | C | C | C | I | C | C | C | C | C | - | I | U | +| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | U | +| timestamp_ntz | N/A | N/A | N/A | C | N/A | N/A | N/A | N/A | N/A | N/A | C | C | - | **Notes:** @@ -158,20 +176,21 @@ Cast operations in Comet fall into three levels of support: -| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | -|---|---|---|---|---|---|---|---|---|---|---|---|---| -| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | -| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | -| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | -| date | N/A | U | U | - | U | U | U | U | U | U | C | U | -| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | -| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | -| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | -| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | -| long | U | C | C | N/A | C | C | C | C | - | C | C | U | -| short | U | C | C | N/A | C | C | C | C | C | - | C | U | -| string | C | C | C | C | I | C | C | C | C | C | - | I | -| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | +| | binary | boolean | byte | date | decimal | double | float | integer | long | short | string | timestamp | timestamp_ntz | +|---|---|---|---|---|---|---|---|---|---|---|---|---|---| +| binary | - | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | N/A | C | N/A | N/A | +| boolean | N/A | - | C | N/A | U | C | C | C | C | C | C | U | N/A | +| byte | U | C | - | N/A | C | C | C | C | C | C | C | U | N/A | +| date | N/A | U | U | - | U | U | U | U | U | U | C | U | U | +| decimal | N/A | C | C | N/A | - | C | C | C | C | C | C | U | N/A | +| double | N/A | C | C | N/A | I | - | C | C | C | C | C | U | N/A | +| float | N/A | C | C | N/A | I | C | - | C | C | C | C | U | N/A | +| integer | U | C | C | N/A | C | C | C | - | C | C | C | U | N/A | +| long | U | C | C | N/A | C | C | C | C | - | C | C | U | N/A | +| short | U | C | C | N/A | C | C | C | C | C | - | C | U | N/A | +| string | C | C | C | C | I | C | C | C | C | C | - | I | U | +| timestamp | N/A | U | U | C | U | U | U | U | C | U | C | - | U | +| timestamp_ntz | N/A | N/A | N/A | C | N/A | N/A | N/A | N/A | N/A | N/A | C | C | - | **Notes:** diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 9ccfc3e6af..8fec5ced18 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -268,17 +268,15 @@ fn can_cast_to_string(from_type: &DataType, _options: &SparkCastOptions) -> bool } } -fn can_cast_from_timestamp_ntz(to_type: &DataType, options: &SparkCastOptions) -> bool { +fn can_cast_from_timestamp_ntz(to_type: &DataType, _options: &SparkCastOptions) -> bool { use DataType::*; match to_type { - Timestamp(_, _) | Date32 | Date64 | Utf8 => { - // incompatible - options.allow_incompat - } - _ => { - // unsupported - false - } + // TimestampNTZ -> Timestamp with timezone (interpret as UTC) + // TimestampNTZ -> Date (simple truncation, no timezone adjustment) + // TimestampNTZ -> String (format as local datetime) + // TimestampNTZ -> Long (extract microseconds directly) + Timestamp(_, Some(_)) | Date32 | Date64 | Utf8 | Int64 => true, + _ => false, } } diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index c4f1576293..4f760d735b 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -78,6 +78,27 @@ impl ScalarUDFImpl for SparkUnixTimestamp { match args { [ColumnarValue::Array(array)] => match array.data_type() { + DataType::Timestamp(Microsecond, None) => { + // TimestampNTZ: No timezone conversion needed - simply divide microseconds + // by MICROS_PER_SECOND. TimestampNTZ stores local time without timezone. + let timestamp_array = + array.as_primitive::(); + + let result: PrimitiveArray = if timestamp_array.null_count() == 0 { + timestamp_array + .values() + .iter() + .map(|µs| micros / MICROS_PER_SECOND) + .collect() + } else { + timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect() + }; + + Ok(ColumnarValue::Array(Arc::new(result))) + } DataType::Timestamp(_, _) => { let is_utc = self.timezone == "UTC"; let array = if is_utc diff --git a/native/spark-expr/src/kernels/temporal.rs b/native/spark-expr/src/kernels/temporal.rs index 2668e5095a..6d9fcf340e 100644 --- a/native/spark-expr/src/kernels/temporal.rs +++ b/native/spark-expr/src/kernels/temporal.rs @@ -17,7 +17,9 @@ //! temporal kernels -use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc}; +use chrono::{ + DateTime, Datelike, Duration, LocalResult, NaiveDate, Offset, TimeZone, Timelike, Utc, +}; use std::sync::Arc; @@ -153,10 +155,30 @@ where Ok(()) } -// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch +// Apply the Tz to the Naive Date Time, convert to UTC, and return as microseconds in Unix epoch. +// This function re-interprets the local datetime in the timezone to ensure the correct DST offset +// is used for the target date (not the original date's offset). This is important when truncation +// changes the date to a different DST period (e.g., from December/PST to October/PDT). +// +// Note: For far-future dates (approximately beyond year 2100), chrono-tz may not accurately +// calculate DST transitions, which can result in incorrect offsets. See the compatibility +// guide for more information. #[inline] fn as_micros_from_unix_epoch_utc(dt: Option>) -> i64 { - dt.unwrap().with_timezone(&Utc).timestamp_micros() + let dt = dt.unwrap(); + let naive = dt.naive_local(); + let tz = dt.timezone(); + + // Re-interpret the local time in the timezone to get the correct DST offset + // for the truncated date. Use noon to avoid DST gaps that occur around midnight. + let noon = naive.date().and_hms_opt(12, 0, 0).unwrap_or(naive); + + let offset = match tz.offset_from_local_datetime(&noon) { + LocalResult::Single(off) | LocalResult::Ambiguous(off, _) => off.fix(), + LocalResult::None => return dt.with_timezone(&Utc).timestamp_micros(), + }; + + (naive - offset).and_utc().timestamp_micros() } #[inline] diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 9fc4b3afdf..fd90a01773 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -45,9 +45,8 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { DataTypes.StringType, DataTypes.BinaryType, DataTypes.DateType, - DataTypes.TimestampType) - // TODO add DataTypes.TimestampNTZType for Spark 3.4 and later - // https://github.com/apache/datafusion-comet/issues/378 + DataTypes.TimestampType, + DataTypes.TimestampNTZType) override def getSupportLevel(cast: Cast): SupportLevel = { if (cast.child.isInstanceOf[Literal]) { @@ -127,13 +126,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { case (dt: ArrayType, dt1: ArrayType) => isSupported(dt.elementType, dt1.elementType, timeZoneId, evalMode) case (dt: DataType, _) if dt.typeName == "timestamp_ntz" => - // https://github.com/apache/datafusion-comet/issues/378 - toType match { - case DataTypes.TimestampType | DataTypes.DateType | DataTypes.StringType => - Incompatible() - case _ => - unsupported(fromType, toType) - } + canCastFromTimestampNTZ(toType) case (_: DecimalType, _: DecimalType) => Compatible() case (DataTypes.StringType, _) => @@ -261,6 +254,16 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { } } + private def canCastFromTimestampNTZ(toType: DataType): SupportLevel = { + toType match { + case DataTypes.LongType => Compatible() + case DataTypes.StringType => Compatible() + case DataTypes.DateType => Compatible() + case DataTypes.TimestampType => Compatible() + case _ => unsupported(DataTypes.TimestampNTZType, toType) + } + } + private def canCastFromBoolean(toType: DataType): SupportLevel = toType match { case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType => diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index a623146916..9d1eb70094 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -257,11 +257,9 @@ object CometSecond extends CometExpressionSerde[Second] { object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] { private def isSupportedInputType(expr: UnixTimestamp): Boolean = { - // Note: TimestampNTZType is not supported because Comet incorrectly applies - // timezone conversion to TimestampNTZ values. TimestampNTZ stores local time - // without timezone, so no conversion should be applied. expr.children.head.dataType match { case TimestampType | DateType => true + case dt if dt.typeName == "timestamp_ntz" => true case _ => false } } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8a68df3820..92b2ff5633 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1036,6 +1036,41 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateTimestamps(), DataTypes.DateType) } + // CAST from TimestampNTZType + + test("cast TimestampNTZType to StringType") { + castTest(generateTimestampNTZs(), DataTypes.StringType) + } + + test("cast TimestampNTZType to DateType") { + castTest(generateTimestampNTZs(), DataTypes.DateType) + } + + test("cast TimestampNTZType to TimestampType") { + castTest(generateTimestampNTZs(), DataTypes.TimestampType) + } + + // CAST to TimestampNTZType + // Note: Spark does not support casting numeric types (Byte, Short, Int, Long, Float, Double, + // Decimal) or BinaryType to TimestampNTZType, so those tests are not included here. + + ignore("cast StringType to TimestampNTZType") { + // Not yet implemented + castTest( + gen.generateStrings(dataSize, timestampPattern, 8).toDF("a"), + DataTypes.TimestampNTZType) + } + + ignore("cast DateType to TimestampNTZType") { + // Not yet implemented + castTest(generateDates(), DataTypes.TimestampNTZType) + } + + ignore("cast TimestampType to TimestampNTZType") { + // Not yet implemented + castTest(generateTimestamps(), DataTypes.TimestampNTZType) + } + // Complex Types test("cast StructType to StringType") { @@ -1276,6 +1311,20 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { .drop("str") } + private def generateTimestampNTZs(): DataFrame = { + val values = + Seq( + "2024-01-01T12:34:56.123456", + "2024-01-01T01:00:00", + "9999-12-31T23:59:59.999999", + "1970-01-01T00:00:00", + "2024-12-31T01:00:00") + withNulls(values) + .toDF("str") + .withColumn("a", col("str").cast(DataTypes.TimestampNTZType)) + .drop("str") + } + private def generateBinary(): DataFrame = { val r = new Random(0) val bytes = new Array[Byte](8) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1ae6926e05..815042751d 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -135,17 +135,63 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } - test("unix_timestamp - timestamp_ntz input falls back to Spark") { - // TimestampNTZ is not supported because Comet incorrectly applies timezone - // conversion. TimestampNTZ stores local time without timezone, so the unix - // timestamp should just be the value divided by microseconds per second. + test("unix_timestamp - timestamp_ntz input") { + // TimestampNTZ stores local time without timezone, so the unix + // timestamp is the value divided by microseconds per second (no timezone conversion). val r = new Random(42) val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions()) ntzDF.createOrReplaceTempView("ntz_tbl") - checkSparkAnswerAndFallbackReason( - "SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz", - "unix_timestamp does not support input type: TimestampNTZType") + checkSparkAnswerAndOperator( + "SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz") + } + + test("hour/minute/second - timestamp_ntz input") { + val r = new Random(42) + val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions()) + ntzDF.createOrReplaceTempView("ntz_tbl") + checkSparkAnswerAndOperator( + "SELECT ts_ntz, hour(ts_ntz), minute(ts_ntz), second(ts_ntz) from ntz_tbl order by ts_ntz") + } + + test("date_trunc - timestamp_ntz input") { + val r = new Random(42) + val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + // Use a reasonable date range (around year 2024) to avoid chrono-tz DST calculation + // issues with far-future dates. The default baseDate is year 3333 which is beyond + // the range where chrono-tz can reliably calculate DST transitions. + val reasonableBaseDate = + new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2024-06-15 12:00:00").getTime + val ntzDF = FuzzDataGenerator.generateDataFrame( + r, + spark, + ntzSchema, + 100, + DataGenOptions(baseDate = reasonableBaseDate)) + ntzDF.createOrReplaceTempView("ntz_tbl") + for (format <- CometTruncTimestamp.supportedFormats) { + checkSparkAnswerAndOperator( + s"SELECT ts_ntz, date_trunc('$format', ts_ntz) from ntz_tbl order by ts_ntz") + } + } + + test("date_format - timestamp_ntz input") { + // Comet's date_format with timestamp_ntz is only compatible with UTC timezone because + // the cast from timestamp_ntz to timestamp interprets the value as UTC, not the session + // timezone. For non-UTC timezones, Comet falls back to Spark. + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + val r = new Random(42) + val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions()) + ntzDF.createOrReplaceTempView("ntz_tbl") + val supportedFormats = + CometDateFormat.supportedFormats.keys.toSeq.filterNot(_.contains("'")) + for (format <- supportedFormats) { + checkSparkAnswerAndOperator( + s"SELECT ts_ntz, date_format(ts_ntz, '$format') from ntz_tbl order by ts_ntz") + } + } } test("unix_timestamp - string input falls back to Spark") {