diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 5474894108..0163a0a16f 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -470,7 +470,7 @@ ### url_funcs -- [ ] parse_url +- [x] parse_url - [ ] url_decode - [ ] url_encode diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 1030e30aaf..f244c7db8a 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -57,6 +57,8 @@ use datafusion_spark::function::math::width_bucket::SparkWidthBucket; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; use datafusion_spark::function::string::space::SparkSpace; +use datafusion_spark::function::url::parse_url::ParseUrl; +use datafusion_spark::function::url::try_parse_url::TryParseUrl; use futures::poll; use futures::stream::StreamExt; use jni::objects::JByteBuffer; @@ -404,6 +406,8 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSpace::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitCount::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(ParseUrl::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(TryParseUrl::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala b/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala new file mode 100644 index 0000000000..7ab82bfa6e --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} + +/** + * Serializer contract for Spark `Invoke` expressions that wrap a private evaluator object. + * + * In Spark 4.0 several built-in functions (e.g. `parse_url`) became `RuntimeReplaceable` and are + * rewritten by the analyser into an `Invoke(evaluator, arg, ...)` node whose first child is a + * `Literal` of `ObjectType` holding a private evaluator instance. The Spark expression class that + * Comet normally dispatches on (e.g. `ParseUrl`) is therefore never seen at serde time on Spark + * 4.0. + * + * Implementors expose: + * - [[invokeTargetClassName]] - the fully-qualified name of the evaluator class embedded in the + * first `Literal(_, ObjectType)` child of the `Invoke` node. This is the key used by + * [[QueryPlanSerde]] to route the node to the correct handler. + * - [[convertFromInvoke]] - the actual serialization logic, receiving the raw `Invoke` + * expression (with all children including the evaluator literal). + * + * To register a new handler, add the object to [[QueryPlanSerde.invokeSerdeByTargetClassName]]. + */ +trait CometInvokeExpressionSerde { + + /** + * Fully-qualified class name of the private evaluator object held in the first child + * `Literal(_, ObjectType(...))` of the `Invoke` node. + * + * Example: `"org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator"` + */ + def invokeTargetClassName: String + + /** + * Serialize the `Invoke` expression into a Comet proto `Expr`. + * + * @param expr + * The raw `Invoke` expression node (first child is the evaluator literal). + * @param inputs + * Resolved input attributes for the enclosing operator. + * @param binding + * Whether attributes are bound (relevant for aggregate expressions). + * @return + * `Some(Expr)` on success, `None` if the expression cannot be handled (the implementor is + * responsible for calling `withInfo` to record the reason). + */ + def convertFromInvoke( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c5880e00ed..174e358965 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -44,6 +44,35 @@ import org.apache.comet.shims.CometExprShim */ object QueryPlanSerde extends Logging with CometExprShim { + // Registry of Invoke-expression handlers, keyed by the fully-qualified class name of the + // evaluator object embedded in the first Literal(_, ObjectType(...)) child of the Invoke node. + // To support a new RuntimeReplaceable expression rewritten to Invoke in Spark 4.0, implement + // CometInvokeExpressionSerde and add the object here. + private val invokeSerdeByTargetClassName + : Map[String, CometInvokeExpressionSerde] = + Seq(CometParseUrl).map(s => s.invokeTargetClassName -> s).toMap + + // Extracts the target object class name from an Invoke-like expression. + private def invokeTargetClassName(expr: Expression): Option[String] = { + expr.children.headOption.flatMap { + _.dataType match { + case objectType: ObjectType => Some(objectType.cls.getName) + case _ => None + } + } + } + + // Routes Invoke expressions to a handler based on the target object class name. + private def convertInvokeExpression( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = + for { + targetClassName <- invokeTargetClassName(expr) + handler <- invokeSerdeByTargetClassName.get(targetClassName) + result <- handler.convertFromInvoke(expr, inputs, binding) + } yield result + private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[ArrayAppend] -> CometArrayAppend, classOf[ArrayCompact] -> CometArrayCompact, @@ -159,6 +188,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Like] -> CometLike, classOf[Lower] -> CometLower, classOf[OctetLength] -> CometScalarFunction("octet_length"), + classOf[ParseUrl] -> CometParseUrl, classOf[RegExpReplace] -> CometRegExpReplace, classOf[Reverse] -> CometReverse, classOf[RLike] -> CometRLike, @@ -557,6 +587,9 @@ object QueryPlanSerde extends Logging with CometExprShim { // `PromotePrecision` is just a wrapper, don't need to serialize it. exprToProtoInternal(child, inputs, binding) + case expr if expr.prettyName == "invoke" => + convertInvokeExpression(expr, inputs, binding) + case expr => QueryPlanSerde.exprSerdeMap.get(expr.getClass) match { case Some(handler) => diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 64ba644048..44de28cce5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -21,15 +21,16 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, ConcatWs, Expression, If, InitCap, IsNull, Left, Length, Like, Literal, Lower, RegExpReplace, Right, RLike, StringLPad, StringRepeat, StringRPad, StringSplit, Substring, Upper} -import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, ConcatWs, Expression, If, InitCap, IsNull, Left, Length, Like, Literal, Lower, ParseUrl, RegExpReplace, Right, RLike, StringLPad, StringRepeat, StringRPad, StringSplit, Substring, Upper} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, ObjectType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.serde.QueryPlanSerde._ object CometStringRepeat extends CometExpressionSerde[StringRepeat] { @@ -382,6 +383,66 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] { } } +object CometParseUrl + extends CometExpressionSerde[ParseUrl] + with CometInvokeExpressionSerde { + + // In Spark 4.0, ParseUrl became RuntimeReplaceable and the analyser rewrites it to + // Invoke(ParseUrlEvaluator.evaluate, url, part[, key]). The first child is a + // Literal(evaluator, ObjectType(ParseUrlEvaluator)). This class name is the key + // used by QueryPlanSerde to route the Invoke node to this handler. + override val invokeTargetClassName: String = + "org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator" + + // Extracts the failOnError flag from the ParseUrlEvaluator instance via reflection. + // Falls back to SQLConf.get.ansiEnabled when reflection is unavailable (null evaluator + // or renamed accessor in a future Spark version). + private def failOnErrorFromEvaluator(evaluator: AnyRef): Boolean = + try { + evaluator.getClass.getMethod("failOnError").invoke(evaluator).asInstanceOf[Boolean] + } catch { + case _: ReflectiveOperationException => SQLConf.get.ansiEnabled + } + + override def convertFromInvoke( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + // The first child is Literal(evaluator, ObjectType(ParseUrlEvaluator)). + // Strip it and read failOnError from it; the remaining children are (url, part[, key]). + val (urlArgs, failOnError) = expr.children match { + case Literal(evaluator, objectType: ObjectType) +: rest + if objectType.cls.getName == invokeTargetClassName => + val foe = + if (evaluator != null) failOnErrorFromEvaluator(evaluator.asInstanceOf[AnyRef]) + else SQLConf.get.ansiEnabled + (rest, foe) + case args => + (args, SQLConf.get.ansiEnabled) + } + toProto(expr, urlArgs, failOnError, inputs, binding) + } + + // In Spark 3.5, ParseUrl is a concrete expression node with a `failOnError` field + // that is directly accessible without reflection. + override def convert(expr: ParseUrl, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = + toProto(expr, expr.children, expr.failOnError, inputs, binding) + + // Serializes (url, part[, key]) arguments into the appropriate native function call. + // Uses parse_url (ANSI/strict) or try_parse_url (legacy/lenient) depending on failOnError. + private def toProto( + expr: Expression, + urlArgs: Seq[Expression], + failOnError: Boolean, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val functionName = if (failOnError) "parse_url" else "try_parse_url" + val childExprs = urlArgs.map(exprToProtoInternal(_, inputs, binding)) + val optExpr = scalarFunctionExprToProto(functionName, childExprs: _*) + optExprWithInfo(optExpr, expr, urlArgs: _*) + } +} + trait CommonStringExprs { def stringDecode( diff --git a/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql b/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql new file mode 100644 index 0000000000..7e0f8c5ac9 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql @@ -0,0 +1,61 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true +-- MinSparkVersion: 3.5 + +statement +CREATE TABLE test_parse_url(url string) USING parquet + +statement +INSERT INTO test_parse_url VALUES + ('http://spark.apache.org/path?query=1'), + ('https://spark.apache.org/path/to/page?query=1&k2=v2'), + ('ftp://user:pwd@ftp.example.com:2121/files?x=1#frag'), + (NULL) + +query +SELECT parse_url(url, 'HOST') FROM test_parse_url + +query +SELECT parse_url(url, 'QUERY') FROM test_parse_url + +query +SELECT parse_url(url, 'PROTOCOL') FROM test_parse_url + +query +SELECT parse_url(url, 'QUERY', 'query'), parse_url(url, 'QUERY', 'k2') FROM test_parse_url + +query +SELECT parse_url(url, 'PATH') FROM test_parse_url + +query +SELECT parse_url(url, 'FILE') FROM test_parse_url + +query +SELECT parse_url(url, 'REF') FROM test_parse_url + +query +SELECT parse_url(url, 'AUTHORITY') FROM test_parse_url + +query +SELECT parse_url(url, 'USERINFO') FROM test_parse_url + +-- Note: try_parse_url is a Comet-internal DataFusion function name used when serializing +-- parse_url with failOnError=false. It is not a registered Spark SQL function and cannot +-- be called directly from SQL. The NULL-on-error behaviour is covered by the +-- "parse_url with invalid URL in legacy mode" Scala test. diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 121d7f7d5a..b4a3ec8c43 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometStringExpressionSuite extends CometTestBase { @@ -248,6 +249,60 @@ class CometStringExpressionSuite extends CometTestBase { } } + test("parse_url") { + withParquetTable( + Seq( + ("http://spark.apache.org/path?query=1", 0), + ("https://spark.apache.org/path/to/page?query=1&k2=v2", 1), + ("ftp://user:pwd@ftp.example.com:2121/files?x=1#frag", 2), + (null, 3)), + "tbl_parse_url") { + + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'HOST') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'QUERY') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'PROTOCOL') FROM tbl_parse_url") + checkSparkAnswerAndOperator( + "SELECT parse_url(_1, 'QUERY', 'query'), parse_url(_1, 'QUERY', 'k2') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'PATH') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'FILE') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'REF') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'AUTHORITY') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'USERINFO') FROM tbl_parse_url") + } + } + + test("parse_url in ANSI mode (Spark 3.5)") { + assume(!isSpark40Plus) + + withParquetTable( + Seq(("http://spark.apache.org/path?query=1", 0), (null, 1)), + "tbl_parse_url_ansi") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'HOST') FROM tbl_parse_url_ansi") + } + } + } + + test("parse_url with invalid URL in legacy mode") { + assume(isSpark40Plus) + + withParquetTable( + Seq( + ("http://spark.apache.org/path?query=1", 0), + ("http://spark.apache.org:abc/path", 1), + (null, 2)), + "tbl_parse_url_invalid") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'HOST') FROM tbl_parse_url_invalid") + } + } + } + + // Note: try_parse_url is a Comet-internal DataFusion function name used when serializing + // parse_url with failOnError=false. It is not a registered Spark SQL function and cannot + // be called directly from SQL. The NULL-on-error behaviour is covered by the + // "parse_url with invalid URL in legacy mode" test above. + test("Various String scalar functions") { val table = "names" withTable(table) {