diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 5474894108..78a5712f2a 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -205,7 +205,7 @@ - [ ] timestamp_micros - [ ] timestamp_millis - [ ] timestamp_seconds -- [ ] to_date +- [x] to_date - [ ] to_timestamp - [ ] to_timestamp_ltz - [ ] to_timestamp_ntz diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index ff75de763b..59e726dfeb 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::datetime_funcs::{to_date, to_timestamp}; use crate::hash_funcs::*; use crate::math_funcs::abs::abs; use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub}; @@ -173,6 +174,12 @@ pub fn create_comet_physical_fun_with_eval_mode( let func = Arc::new(spark_modulo); make_comet_scalar_udf!("spark_modulo", func, without data_type, fail_on_error) } + "to_timestamp" => { + make_comet_scalar_udf!("to_timestamp", to_timestamp, without data_type, fail_on_error) + } + "to_date" => { + make_comet_scalar_udf!("to_date", to_date, without data_type, fail_on_error) + } "abs" => { let func = Arc::new(abs); make_comet_scalar_udf!("abs", func, without data_type) diff --git a/native/spark-expr/src/conversion_funcs/mod.rs b/native/spark-expr/src/conversion_funcs/mod.rs index 8e3bbe1c6e..a80c21e32a 100644 --- a/native/spark-expr/src/conversion_funcs/mod.rs +++ b/native/spark-expr/src/conversion_funcs/mod.rs @@ -20,3 +20,5 @@ pub mod cast; mod numeric; mod string; mod utils; + +pub(crate) use string::cast_string_to_date; diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index 5bafc1d287..9e90f59420 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -20,6 +20,7 @@ mod date_trunc; mod extract_date_part; mod make_date; mod timestamp_trunc; +mod to_timestamp; mod unix_timestamp; pub use date_diff::SparkDateDiff; @@ -29,4 +30,5 @@ pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; pub use make_date::SparkMakeDate; pub use timestamp_trunc::TimestampTruncExpr; +pub use to_timestamp::{to_date, to_timestamp}; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/to_timestamp.rs b/native/spark-expr/src/datetime_funcs/to_timestamp.rs new file mode 100644 index 0000000000..ca28b1d88b --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/to_timestamp.rs @@ -0,0 +1,594 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::conversion_funcs::cast_string_to_date; +use crate::EvalMode; +use arrow::array::{Array, ArrayRef, StringArray}; +use arrow::compute::cast; +use arrow::datatypes::DataType; +use arrow::datatypes::DataType::Timestamp; +use arrow::datatypes::TimeUnit::Microsecond; +use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Timelike}; +use chrono_tz::Tz; +use datafusion::common::internal_err; +use datafusion::common::{DataFusionError, Result}; +use datafusion::functions::utils::make_scalar_function; +use datafusion::logical_expr::ColumnarValue; +use datafusion::scalar::ScalarValue; +use std::str::FromStr; +use std::sync::Arc; + +const TO_TIMESTAMP: &str = "to_timestamp"; +const TO_DATE: &str = "to_date"; + +#[derive(Debug, Clone, Copy, PartialEq)] +enum FractionPrecision { + Millis, + Micros, + Nanos, +} + +/// Detect Spark fractional precision from a format string. +fn detect_fraction_precision(fmt: &str) -> Option { + let count = fmt.chars().filter(|&c| c == 'S').count(); + match count { + 0 => None, + 1..=3 => Some(FractionPrecision::Millis), + 4..=6 => Some(FractionPrecision::Micros), + _ => Some(FractionPrecision::Nanos), + } +} + +/// Convert a Spark/Java SimpleDateFormat pattern to a chrono strftime pattern. +fn spark_to_chrono(fmt: &str) -> (String, Option) { + let precision = detect_fraction_precision(fmt); + + let mut out = fmt.to_string(); + + // Date + out = out.replace("yyyy", "%Y"); + out = out.replace("MM", "%m"); + out = out.replace("dd", "%d"); + + // Time + out = out.replace("HH", "%H"); + out = out.replace("mm", "%M"); + out = out.replace("ss", "%S"); + + // Fractions — longest match first to avoid partial replacement + out = out + .replace(".SSSSSSSSS", "%.f") + .replace(".SSSSSS", "%.f") + .replace(".SSS", "%.f"); + + // Timezones + out = out.replace("XXX", "%:z"); + out = out.replace("Z", "%z"); + + (out, precision) +} + +/// Returns true when the Spark format string contains a time component. +fn spark_format_has_time(fmt: &str) -> bool { + fmt.contains("HH") || fmt.contains("mm") || fmt.contains("ss") +} + +/// Parse a string value using a Spark format, returning a `NaiveDateTime`. +/// Date-only formats are expanded to midnight. +fn parse_spark_naive( + value: &str, + spark_fmt: &str, +) -> Result<(NaiveDateTime, Option), chrono::ParseError> { + let (chrono_fmt, precision) = spark_to_chrono(spark_fmt); + + if spark_format_has_time(spark_fmt) { + let ts = NaiveDateTime::parse_from_str(value, &chrono_fmt)?; + Ok((ts, precision)) + } else { + let date = NaiveDate::parse_from_str(value, &chrono_fmt)?; + Ok((date.and_hms_opt(0, 0, 0).unwrap(), precision)) + } +} + +/// Truncate sub-second precision to what the Spark format string actually represents. +fn normalize_fraction( + mut ts: NaiveDateTime, + precision: Option, +) -> Option { + match precision { + Some(FractionPrecision::Millis) => { + let ms = ts.and_utc().timestamp_subsec_millis(); + ts = ts.with_nanosecond(ms * 1_000_000)?; + } + Some(FractionPrecision::Micros) => { + let us = ts.and_utc().timestamp_subsec_micros(); + ts = ts.with_nanosecond(us * 1_000)?; + } + Some(FractionPrecision::Nanos) | None => {} + } + Some(ts) +} + +/// Parse a string using a Spark format pattern and timezone, returning UTC microseconds. +pub fn spark_to_timestamp_parse( + value: &str, + spark_fmt: &str, + tz: Tz, +) -> Result { + let (naive, precision) = parse_spark_naive(value, spark_fmt).map_err(|_| { + DataFusionError::Plan(format!("Error parsing '{value}' with format '{spark_fmt}'")) + })?; + + let naive = normalize_fraction(naive, precision) + .ok_or_else(|| DataFusionError::Plan("Invalid fractional timestamp".into()))?; + + let local: DateTime = tz.from_local_datetime(&naive).single().ok_or_else(|| { + DataFusionError::Plan(format!("Ambiguous or invalid datetime in timezone {tz}")) + })?; + + Ok(local.timestamp_micros()) +} + +// to_timestamp — As a Comet scalar UDF +pub fn to_timestamp(args: &[ColumnarValue], fail_on_error: bool) -> Result { + make_scalar_function( + move |input_args| spark_to_timestamp(input_args, fail_on_error), + vec![], + )(args) +} + +/// Core implementation of `to_timestamp(value, format[, timezone])`. +/// +/// Accepts a string, date, or timestamp column as the first argument. +/// Date and timestamp inputs are cast to `TimestampMicrosecond` via Arrow before parsing, +/// matching Spark's behaviour for `GetTimestamp`. +pub fn spark_to_timestamp(args: &[ArrayRef], fail_on_error: bool) -> Result { + if args.len() < 2 || args.len() > 3 { + return internal_err!( + "`{}` function requires 2 or 3 arguments, got {} arguments", + TO_TIMESTAMP, + args.len() + ); + } + + // Normalise the first argument to StringArray; Date32 / Timestamp inputs are + // cast to string first so they can be re-parsed with the supplied format. + let input_array = normalise_to_string(&args[0])?; + let dates: &StringArray = input_array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "`{TO_TIMESTAMP}`: first argument must be a string, date, or timestamp column" + )) + })?; + + let format_array: &StringArray = + args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "`{TO_TIMESTAMP}`: second argument (format) must be a string column" + )) + })?; + + let tz: Tz = args + .get(2) + .and_then(|arg| arg.as_any().downcast_ref::()) + .and_then(|v| { + if v.is_null(0) { + None + } else { + Tz::from_str(v.value(0)).ok() + } + }) + .unwrap_or(Tz::UTC); + + let utc_tz: Arc = Arc::from(chrono_tz::UTC.name()); + + let values: Result> = dates + .iter() + .enumerate() + .map(|(index, value)| { + let format = if format_array.len() == 1 { + if format_array.is_null(0) { + None + } else { + Some(format_array.value(0)) + } + } else if format_array.is_null(index) { + None + } else { + Some(format_array.value(index)) + }; + + match (value, format) { + (None, _) | (_, None) => ScalarValue::Int64(None) + .cast_to(&Timestamp(Microsecond, Some(Arc::clone(&utc_tz)))), + (Some(date_raw), Some(format)) => { + let parsed_value = spark_to_timestamp_parse(date_raw, format, tz); + + match (parsed_value, fail_on_error) { + (Ok(v), _) => ScalarValue::Int64(Some(v)) + .cast_to(&Timestamp(Microsecond, Some(Arc::clone(&utc_tz)))), + (Err(err), true) => Err(err), + (Err(_), false) => ScalarValue::Int64(None) + .cast_to(&Timestamp(Microsecond, Some(Arc::clone(&utc_tz)))), + } + } + } + }) + .collect::>>(); + + let output: ArrayRef = ScalarValue::iter_to_array(values?)?; + Ok(output) +} + +// to_date — As a Comet scalar UDF + +pub fn to_date(args: &[ColumnarValue], fail_on_error: bool) -> Result { + make_scalar_function( + move |input_args| spark_to_date(input_args, fail_on_error), + vec![], + )(args) +} + +/// Core implementation of `to_date(value[, format])`. +/// +/// - Without format: delegates to `cast_string_to_date` (Spark's default ISO parsing). +/// - With format: parses via `spark_to_timestamp_parse`, then drops the time component. +/// - Date inputs pass through unchanged; Timestamp inputs are truncated to date. +pub fn spark_to_date(args: &[ArrayRef], fail_on_error: bool) -> Result { + if args.is_empty() || args.len() > 2 { + return internal_err!( + "`{}` function requires 1 or 2 arguments, got {} arguments", + TO_DATE, + args.len() + ); + } + + let eval_mode = if fail_on_error { + EvalMode::Ansi + } else { + EvalMode::Legacy + }; + + // Fast path: Date32 input → return as-is. + if args[0].data_type() == &DataType::Date32 { + return Ok(Arc::clone(&args[0])); + } + + // Fast path: Timestamp input without format → cast to Date32 via Arrow. + if matches!(args[0].data_type(), DataType::Timestamp(_, _)) && args.len() == 1 { + let date_array = cast(&args[0], &DataType::Date32)?; + return Ok(date_array); + } + + // Normalise input to StringArray for all remaining paths. + let input_array = normalise_to_string(&args[0])?; + + if args.len() == 1 { + // No format — use the existing Spark-compatible ISO parser. + return cast_string_to_date(&input_array, &DataType::Date32, eval_mode) + .map_err(DataFusionError::from); + } + + // With format — parse via spark_to_timestamp_parse then truncate to date. + let format_array: &StringArray = + args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "`{TO_DATE}`: second argument (format) must be a string column" + )) + })?; + + let string_array: &StringArray = input_array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "`{TO_DATE}`: first argument must be a string, date, or timestamp" + )) + })?; + + let values: Result> = string_array + .iter() + .enumerate() + .map(|(index, value)| { + let format = if format_array.len() == 1 { + if format_array.is_null(0) { + None + } else { + Some(format_array.value(0)) + } + } else if format_array.is_null(index) { + None + } else { + Some(format_array.value(index)) + }; + + match (value, format) { + (None, _) | (_, None) => Ok(ScalarValue::Date32(None)), + (Some(date_raw), Some(format)) => { + let parsed = spark_to_timestamp_parse(date_raw, format, Tz::UTC); + match (parsed, fail_on_error) { + (Ok(micros), _) => { + // Convert UTC microseconds to days since epoch. + let days = (micros / 1_000_000 / 86_400) as i32; + Ok(ScalarValue::Date32(Some(days))) + } + (Err(err), true) => Err(err), + (Err(_), false) => Ok(ScalarValue::Date32(None)), + } + } + } + }) + .collect::>>(); + + let output: ArrayRef = ScalarValue::iter_to_array(values?)?; + Ok(output) +} + +// Helpers + +/// Convert a Date32 or Timestamp array to a StringArray using Arrow's built-in cast, +/// so that string-based parsers can operate on all supported input types. +fn normalise_to_string(array: &ArrayRef) -> Result { + match array.data_type() { + DataType::Utf8 => Ok(Arc::clone(array)), + DataType::Date32 | DataType::Timestamp(_, _) => { + cast(array, &DataType::Utf8).map_err(DataFusionError::from) + } + other => internal_err!("Unsupported input type for to_timestamp/to_date: {other}"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, Date32Array, StringArray, TimestampMicrosecondArray}; + use chrono::{NaiveDate, NaiveDateTime}; + use chrono_tz::UTC; + + #[test] + fn detects_no_fraction() { + assert_eq!(detect_fraction_precision("yyyy-MM-dd HH:mm:ss"), None); + } + + #[test] + fn detects_millis_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSS"), + Some(FractionPrecision::Millis) + ); + } + + #[test] + fn detects_micros_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSSSSS"), + Some(FractionPrecision::Micros) + ); + } + + #[test] + fn detects_nanos_precision() { + assert_eq!( + detect_fraction_precision("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"), + Some(FractionPrecision::Nanos) + ); + } + + #[test] + fn converts_basic_date_format() { + let (fmt, precision) = spark_to_chrono("yyyy-MM-dd"); + assert_eq!(fmt, "%Y-%m-%d"); + assert_eq!(precision, None); + } + + #[test] + fn converts_timestamp_with_millis() { + let (fmt, precision) = spark_to_chrono("yyyy-MM-dd HH:mm:ss.SSS"); + + assert_eq!(fmt, "%Y-%m-%d %H:%M:%S%.f"); + assert_eq!(precision, Some(FractionPrecision::Millis)); + } + + #[test] + fn converts_timestamp_with_timezone() { + let (fmt, _) = spark_to_chrono("yyyy-MM-dd HH:mm:ssXXX"); + + assert_eq!(fmt, "%Y-%m-%d %H:%M:%S%:z"); + } + + #[test] + fn detects_date_only_format() { + assert!(!spark_format_has_time("yyyy-MM-dd")); + } + + #[test] + fn detects_timestamp_format() { + assert!(spark_format_has_time("yyyy-MM-dd HH:mm:ss")); + } + + #[test] + fn parses_date_as_midnight_timestamp() { + let (ts, _) = parse_spark_naive("2026-01-30", "yyyy-MM-dd").unwrap(); + + let expected = NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + + assert_eq!(ts, expected); + } + + #[test] + fn parses_timestamp_with_millis() { + let (ts, precision) = + parse_spark_naive("2026-01-30 10:30:52.123", "yyyy-MM-dd HH:mm:ss.SSS").unwrap(); + + assert_eq!(precision, Some(FractionPrecision::Millis)); + assert_eq!(ts.and_utc().timestamp_subsec_millis(), 123); + } + + #[test] + fn normalizes_millis_precision() { + let ts = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123456", "%Y-%m-%d %H:%M:%S%.6f") + .unwrap(); + + let normalized = normalize_fraction(ts, Some(FractionPrecision::Millis)).unwrap(); + + assert_eq!(normalized.and_utc().timestamp_subsec_nanos(), 123_000_000); + } + + #[test] + fn normalizes_micros_precision() { + let ts = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123456", "%Y-%m-%d %H:%M:%S%.6f") + .unwrap(); + + let normalized = normalize_fraction(ts, Some(FractionPrecision::Micros)).unwrap(); + + assert_eq!(normalized.and_utc().timestamp_subsec_nanos(), 123_456_000); + } + + #[test] + fn parses_timestamp_and_preserves_millis() { + let micros = + spark_to_timestamp_parse("2026-01-30 10:30:52.123", "yyyy-MM-dd HH:mm:ss.SSS", UTC) + .unwrap(); + + let expected = + NaiveDateTime::parse_from_str("2026-01-30 10:30:52.123", "%Y-%m-%d %H:%M:%S%.3f") + .unwrap() + .and_utc() + .timestamp_micros(); + + assert_eq!(micros, expected); + } + + #[test] + fn parses_date_literal_as_midnight() { + let micros = spark_to_timestamp_parse("2026-01-30", "yyyy-MM-dd", UTC).unwrap(); + + let expected = NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap() + .and_utc() + .timestamp_micros(); + + assert_eq!(micros, expected); + } + + #[test] + fn supports_two_arguments_without_timezone() { + let dates: ArrayRef = + Arc::new(StringArray::from(vec![Some("2026-01-30 10:30:52")])) as ArrayRef; + let formats: ArrayRef = + Arc::new(StringArray::from(vec![Some("yyyy-MM-dd HH:mm:ss")])) as ArrayRef; + + let result = spark_to_timestamp(&[dates, formats], true).unwrap(); + let ts = result + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!ts.is_null(0)); + } + + #[test] + fn returns_null_on_parse_error_when_fail_on_error_is_false() { + let dates: ArrayRef = Arc::new(StringArray::from(vec![Some("malformed")])) as ArrayRef; + let formats: ArrayRef = Arc::new(StringArray::from(vec![Some("yyyy-MM-dd")])) as ArrayRef; + + let result = spark_to_timestamp(&[dates, formats], false).unwrap(); + let ts = result + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(ts.is_null(0)); + } + + #[test] + fn to_date_with_format_returns_date32() { + let dates: ArrayRef = Arc::new(StringArray::from(vec![Some("2026/01/30")])) as ArrayRef; + let formats: ArrayRef = Arc::new(StringArray::from(vec![Some("yyyy/MM/dd")])) as ArrayRef; + + let result = spark_to_date(&[dates, formats], false).unwrap(); + let arr = result.as_any().downcast_ref::().unwrap(); + + assert!(!arr.is_null(0)); + // 2026-01-30 = days since epoch + let expected = NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32; + assert_eq!(arr.value(0), expected); + } + + #[test] + fn to_date_no_format_returns_date32() { + let dates: ArrayRef = Arc::new(StringArray::from(vec![Some("2026-01-30")])) as ArrayRef; + + let result = spark_to_date(&[dates], false).unwrap(); + let arr = result.as_any().downcast_ref::().unwrap(); + + assert!(!arr.is_null(0)); + } + + #[test] + fn to_date_null_input_returns_null() { + let dates: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>])) as ArrayRef; + + let result = spark_to_date(&[dates], false).unwrap(); + let arr = result.as_any().downcast_ref::().unwrap(); + + assert!(arr.is_null(0)); + } + + #[test] + fn to_date_malformed_returns_null_when_not_fail_on_error() { + let dates: ArrayRef = Arc::new(StringArray::from(vec![Some("not-a-date")])) as ArrayRef; + let formats: ArrayRef = Arc::new(StringArray::from(vec![Some("yyyy-MM-dd")])) as ArrayRef; + + let result = spark_to_date(&[dates, formats], false).unwrap(); + let arr = result.as_any().downcast_ref::().unwrap(); + + assert!(arr.is_null(0)); + } + + #[test] + fn to_date_date32_input_passes_through() { + let days = NaiveDate::from_ymd_opt(2026, 1, 30) + .unwrap() + .signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()) + .num_days() as i32; + let input: ArrayRef = Arc::new(Date32Array::from(vec![Some(days)])) as ArrayRef; + + let result = spark_to_date(&[input], false).unwrap(); + let arr = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(arr.value(0), days); + } +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c5880e00ed..ba307d5bb3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -213,7 +213,9 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[WeekDay] -> CometWeekDay, classOf[DayOfYear] -> CometDayOfYear, classOf[WeekOfYear] -> CometWeekOfYear, - classOf[Quarter] -> CometQuarter) + classOf[Quarter] -> CometQuarter, + classOf[GetTimestamp] -> CometGetTimestamp, + classOf[ParseToDate] -> CometParseToDate) private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index d36b6a3b40..e808be592e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,8 +21,8 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} -import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, GetTimestamp, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, ParseToDate, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -175,6 +175,93 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF getDateField(expr, CometGetDateField.Quarter, inputs, binding) } } +object CometGetTimestamp extends CometExpressionSerde[GetTimestamp] { + + override def getSupportLevel(expr: GetTimestamp): SupportLevel = { + expr.left.dataType match { + case StringType | DateType | TimestampType => Compatible() + case _: TimestampNTZType => + Unsupported( + Some( + "to_timestamp/to_date does not support TimestampNTZ input: " + + "timezone conversion would be incorrectly applied")) + case other => + Unsupported(Some(s"to_timestamp/to_date does not support input type: $other")) + } + } + + /** + * Convert a Spark expression into a protocol buffer representation that can be passed into + * native code. + * + * @param expr + * The Spark expression. + * @param inputs + * The input attributes. + * @param binding + * Whether the attributes are bound (this is only relevant in aggregate expressions). + * @return + * Protocol buffer representation, or None if the expression could not be converted. In this + * case it is expected that the input expression will have been tagged with reasons why it + * could not be converted. + */ + override def convert( + expr: GetTimestamp, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val leftExpr: Option[Expr] = + exprToProtoInternal(expr.left, inputs, binding) // timestamp or date + val rightExpr: Option[Expr] = exprToProtoInternal(expr.right, inputs, binding) // format + val tZ: Option[Expr] = + expr.timeZoneId.flatMap(tz => exprToProtoInternal(Literal(tz), inputs, binding)) + scalarFunctionExprToProtoWithReturnType( + "to_timestamp", + expr.dataType, + failOnError = expr.failOnError, + args = leftExpr, + rightExpr, + tZ) + } +} + +object CometParseToDate extends CometExpressionSerde[ParseToDate] { + + /** + * Convert a Spark expression into a protocol buffer representation that can be passed into + * native code. + * + * @param expr + * The Spark expression. + * @param inputs + * The input attributes. + * @param binding + * Whether the attributes are bound (this is only relevant in aggregate expressions). + * @return + * Protocol buffer representation, or None if the expression could not be converted. In this + * case it is expected that the input expression will have been tagged with reasons why it + * could not be converted. + */ + override def convert( + expr: ParseToDate, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val failOnError = parseToDateFailOnError(expr) + val childExpr: Option[Expr] = exprToProtoInternal(expr.left, inputs, binding) + val optExpr = expr.format match { + case Some(format) => + val formatExpr: Option[Expr] = exprToProtoInternal(format, inputs, binding) + scalarFunctionExprToProtoWithReturnType( + "to_date", + expr.dataType, + failOnError, + childExpr, + formatExpr) + case None => + scalarFunctionExprToProtoWithReturnType("to_date", expr.dataType, failOnError, childExpr) + } + optExprWithInfo(optExpr, expr, expr.children: _*) + } +} object CometHour extends CometExpressionSerde[Hour] { override def convert( diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 600931c346..ab715e2641 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -20,6 +20,7 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.comet.expressions.CometEvalMode import org.apache.comet.serde.CommonStringExprs @@ -32,6 +33,8 @@ trait CometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + def parseToDateFailOnError(expr: ParseToDate): Boolean = SQLConf.get.ansiEnabled + protected def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE def versionSpecificExprToProtoInternal( diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 8e9cb1c07b..1b11965f12 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -35,6 +35,8 @@ trait CometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + def parseToDateFailOnError(expr: ParseToDate): Boolean = expr.ansiEnabled + protected def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE def versionSpecificExprToProtoInternal( diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 2c5cebd166..c8bbbda27e 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -38,6 +38,8 @@ trait CometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + def parseToDateFailOnError(expr: ParseToDate): Boolean = expr.ansiEnabled + protected def binaryOutputStyle: BinaryOutputStyle = { SQLConf.get .getConf(SQLConf.BINARY_OUTPUT_STYLE) diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql new file mode 100644 index 0000000000..6f89dc2bd7 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_date.sql @@ -0,0 +1,46 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- to_date function +statement +CREATE TABLE test_to_date(col STRING) USING parquet + +statement +INSERT INTO test_to_date VALUES ('2026-01-30'), ('2026-03-10'), (NULL) + +query +SELECT col, to_date(col) FROM test_to_date + +statement +CREATE TABLE test_to_date_fmt(col STRING) USING parquet + +statement +INSERT INTO test_to_date_fmt VALUES ('2026/01/30'), ('2026/03/10'), (NULL) + +query +SELECT col, to_date(col, 'yyyy/MM/dd') FROM test_to_date_fmt + +query +SELECT to_date('2026-01-30') + +query +SELECT to_date('2026/01/30', 'yyyy/MM/dd') + +query +SELECT to_date('2026-01-30 10:30:00') diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1ae6926e05..d665928cbb 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -114,6 +114,145 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } + test("to_date parses date literal") { + withoutConstantFolding { + checkSparkAnswerAndOperator("SELECT to_date('2026-01-30')") + } + } + + test("to_date parses date literal with explicit format") { + withoutConstantFolding { + checkSparkAnswerAndOperator("SELECT to_date('2026/01/30', 'yyyy/MM/dd')") + } + } + + test("to_date parses date string column") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026-01-30"), Row("2026-03-10"), Row("2026-10-10"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswerAndOperator("SELECT dt_str, to_date(dt_str) FROM string_tbl") + } + } + + test("to_date parses date string column with explicit format") { + withoutConstantFolding { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026/01/30"), Row("2026/03/10"), Row("2026/10/10"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswerAndOperator( + "SELECT dt_str, to_date(dt_str, 'yyyy/MM/dd') FROM string_tbl") + } + } + } + + test("to_date parses timestamp literal string") { + withoutConstantFolding { + checkSparkAnswerAndOperator("SELECT to_date('2026-01-30 04:17:52')") + } + } + + test("to_date parses timestamp string column") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq( + Row("2026-01-30 04:17:52"), + Row("2026-03-10 04:17:52"), + Row("2026-10-10 04:17:52"), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswerAndOperator("SELECT dt_str, to_date(dt_str) FROM string_tbl") + } + } + + test("to_date returns null for malformed input when ANSI is disabled") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026-01-30"), Row("malformed"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswerAndOperator("SELECT dt_str, to_date(dt_str) FROM string_tbl") + } + } + } + + test("to_date throws for malformed input when ANSI is enabled") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + withTempView("string_tbl") { + val schema = StructType(Seq(StructField("dt_str", DataTypes.StringType, nullable = true))) + + val data = Seq(Row("2026-01-30"), Row("malformed"), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("string_tbl") + + checkSparkAnswerMaybeThrows(sql("SELECT dt_str, to_date(dt_str) FROM string_tbl")) match { + case (Some(sparkExc), Some(cometExc)) => + assert(sparkExc.getMessage.toLowerCase.contains("date")) + assert(cometExc.getMessage.toLowerCase.contains("date")) + case (Some(_), None) => + fail("Expected Comet to throw when Spark throws") + case (None, Some(cometExc)) => + throw cometExc + case _ => + fail("Expected both Spark and Comet to throw in ANSI mode") + } + } + } + } + + test("to_date with DateType input passes through unchanged") { + withTempView("date_tbl") { + val schema = StructType(Seq(StructField("dt", DataTypes.DateType, nullable = true))) + val data = Seq(Row(java.sql.Date.valueOf("2026-01-30")), Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("date_tbl") + + checkSparkAnswerAndOperator("SELECT dt, to_date(dt) FROM date_tbl") + } + } + + test("to_date with TimestampType input truncates to date") { + withTempView("ts_tbl") { + val schema = + StructType(Seq(StructField("ts", DataTypes.TimestampType, nullable = true))) + val data = Seq( + Row(java.sql.Timestamp.valueOf("2026-01-30 04:17:52")), + Row(java.sql.Timestamp.valueOf("2026-03-10 23:59:59")), + Row(null)) + + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("ts_tbl") + + checkSparkAnswerAndOperator("SELECT ts, to_date(ts) FROM ts_tbl") + } + } + test("unix_timestamp - timestamp input") { createTimestampTestData.createOrReplaceTempView("tbl") for (timezone <- Seq("UTC", "America/Los_Angeles")) { @@ -395,4 +534,9 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Test null handling checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } + + private def withoutConstantFolding[A](f: => A): Unit = + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding")(f) }