From 1fd729cc9123126e390032a62a5a83ce3dee1752 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Fri, 20 Feb 2026 21:31:12 +0100 Subject: [PATCH 1/8] add support parse_url --- docs/spark_expressions_support.md | 2 +- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 4 ++ .../org/apache/comet/serde/strings.scala | 32 ++++++++++++++- .../expressions/string/parse_url.sql | 40 +++++++++++++++++++ .../comet/CometStringExpressionSuite.scala | 23 +++++++++++ 6 files changed, 100 insertions(+), 3 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/string/parse_url.sql diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 5474894108..0163a0a16f 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -470,7 +470,7 @@ ### url_funcs -- [ ] parse_url +- [x] parse_url - [ ] url_decode - [ ] url_encode diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 436e5e99c5..9f8bb17db9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -55,6 +55,7 @@ use datafusion_spark::function::math::hex::SparkHex; use datafusion_spark::function::math::width_bucket::SparkWidthBucket; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; +use datafusion_spark::function::url::parse_url::ParseUrl; use futures::poll; use futures::stream::StreamExt; use jni::objects::JByteBuffer; @@ -377,6 +378,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkWidthBucket::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(ParseUrl::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9d13ccd9ed..0fa4c951be 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -159,6 +159,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Like] -> CometLike, classOf[Lower] -> CometLower, classOf[OctetLength] -> CometScalarFunction("octet_length"), + classOf[ParseUrl] -> CometParseUrl, classOf[RegExpReplace] -> CometRegExpReplace, classOf[Reverse] -> CometReverse, classOf[RLike] -> CometRLike, @@ -557,6 +558,9 @@ object QueryPlanSerde extends Logging with CometExprShim { // `PromotePrecision` is just a wrapper, don't need to serialize it. exprToProtoInternal(child, inputs, binding) + case expr if expr.prettyName == "parse_url" => + CometParseUrl.convertExpression(expr, inputs, binding) + case expr => QueryPlanSerde.exprSerdeMap.get(expr.getClass) match { case Some(handler) => diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 64ba644048..fe2f974605 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, ConcatWs, Expression, If, InitCap, IsNull, Left, Length, Like, Literal, Lower, RegExpReplace, Right, RLike, StringLPad, StringRepeat, StringRPad, StringSplit, Substring, Upper} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, ConcatWs, Expression, If, InitCap, IsNull, Left, Length, Like, Literal, Lower, ParseUrl, RegExpReplace, Right, RLike, StringLPad, StringRepeat, StringRPad, StringSplit, Substring, Upper} import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType} import org.apache.spark.unsafe.types.UTF8String @@ -29,7 +29,7 @@ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.serde.QueryPlanSerde._ object CometStringRepeat extends CometExpressionSerde[StringRepeat] { @@ -382,6 +382,34 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] { } } +object CometParseUrl extends CometExpressionSerde[ParseUrl] { + private def convertInternal( + expr: Expression, + rawChildren: Seq[Expression], + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val parseUrlArgs: Seq[Expression] = rawChildren.lastOption match { + case Some(Literal(_: Boolean, _)) => rawChildren.dropRight(1) + case Some(Literal(_: java.lang.Boolean, _)) => rawChildren.dropRight(1) + case _ => rawChildren + } + val childExprs: Seq[Option[Expr]] = parseUrlArgs.map(exprToProtoInternal(_, inputs, binding)) + val optExpr: Option[Expr] = scalarFunctionExprToProto("parse_url", childExprs: _*) + optExprWithInfo(optExpr, expr, parseUrlArgs: _*) + } + + def convertExpression( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + convertInternal(expr, expr.children, inputs, binding) + } + + override def convert(expr: ParseUrl, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { + convertInternal(expr, expr.children, inputs, binding) + } +} + trait CommonStringExprs { def stringDecode( diff --git a/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql b/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql new file mode 100644 index 0000000000..c66e5e99e3 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql @@ -0,0 +1,40 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true +-- MinSparkVersion: 3.5 + +statement +CREATE TABLE test_parse_url(url string) USING parquet + +statement +INSERT INTO test_parse_url VALUES + ('http://spark.apache.org/path?query=1'), + ('https://spark.apache.org/path/to/page?query=1&k2=v2'), + (NULL) + +query +SELECT parse_url(url, 'HOST') FROM test_parse_url + +query +SELECT parse_url(url, 'QUERY') FROM test_parse_url + +query +SELECT parse_url(url, 'PROTOCOL') FROM test_parse_url + +query +SELECT parse_url(url, 'QUERY', 'query'), parse_url(url, 'QUERY', 'k2') FROM test_parse_url diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 121d7f7d5a..0141a2020c 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -248,6 +248,29 @@ class CometStringExpressionSuite extends CometTestBase { } } + test("parse_url") { + withParquetTable( + Seq( + ("http://spark.apache.org/path?query=1", 0), + ("https://spark.apache.org/path/to/page?query=1&k2=v2", 1), + (null, 2)), + "tbl_parse_url") { + + val df = spark.sql("SELECT parse_url(_1, 'PATH') FROM tbl_parse_url") + df.show(20, false) + val df2 = spark.sql("SELECT parse_url(_1, 'FILE') FROM tbl_parse_url") + df2.show(20, false) + + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'HOST') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'QUERY') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'PROTOCOL') FROM tbl_parse_url") + checkSparkAnswerAndOperator( + "SELECT parse_url(_1, 'QUERY', 'query'), parse_url(_1, 'QUERY', 'k2') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'PATH') FROM tbl_parse_url") + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'FILE') FROM tbl_parse_url") + } + } + test("Various String scalar functions") { val table = "names" withTable(table) { From 82d8372f93aa1376dbbcbcd80550b14e9c152ae8 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sun, 22 Feb 2026 09:02:10 +0100 Subject: [PATCH 2/8] edit --- .../scala/org/apache/comet/CometStringExpressionSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 0141a2020c..d99e36c4cd 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -256,11 +256,6 @@ class CometStringExpressionSuite extends CometTestBase { (null, 2)), "tbl_parse_url") { - val df = spark.sql("SELECT parse_url(_1, 'PATH') FROM tbl_parse_url") - df.show(20, false) - val df2 = spark.sql("SELECT parse_url(_1, 'FILE') FROM tbl_parse_url") - df2.show(20, false) - checkSparkAnswerAndOperator("SELECT parse_url(_1, 'HOST') FROM tbl_parse_url") checkSparkAnswerAndOperator("SELECT parse_url(_1, 'QUERY') FROM tbl_parse_url") checkSparkAnswerAndOperator("SELECT parse_url(_1, 'PROTOCOL') FROM tbl_parse_url") From 9733527e07e1b4782625dbceab0f80d5315eceb1 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Tue, 24 Feb 2026 07:39:35 +0100 Subject: [PATCH 3/8] fix parse_url Spark 4 Invoke rewrite and legacy null semantics --- native/core/src/execution/jni_api.rs | 2 ++ .../org/apache/comet/serde/strings.scala | 27 ++++++++++++++++--- .../apache/comet/shims/CometExprShim.scala | 17 ++++++++++-- .../comet/CometStringExpressionSuite.scala | 16 +++++++++++ 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 98591a2835..b12370c7f2 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -56,6 +56,7 @@ use datafusion_spark::function::math::width_bucket::SparkWidthBucket; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; use datafusion_spark::function::url::parse_url::ParseUrl; +use datafusion_spark::function::url::try_parse_url::TryParseUrl; use futures::poll; use futures::stream::StreamExt; use jni::objects::JByteBuffer; @@ -402,6 +403,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(ParseUrl::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(TryParseUrl::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index fe2f974605..56a0e2bc9f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -383,9 +383,18 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] { } object CometParseUrl extends CometExpressionSerde[ParseUrl] { + private def failOnErrorFromChildren(rawChildren: Seq[Expression]): Option[Boolean] = { + rawChildren.lastOption.flatMap { + case Literal(value: Boolean, _) => Some(value) + case Literal(value: java.lang.Boolean, _) => Some(value.booleanValue()) + case _ => None + } + } + private def convertInternal( expr: Expression, rawChildren: Seq[Expression], + failOnError: Option[Boolean], inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { val parseUrlArgs: Seq[Expression] = rawChildren.lastOption match { @@ -393,20 +402,30 @@ object CometParseUrl extends CometExpressionSerde[ParseUrl] { case Some(Literal(_: java.lang.Boolean, _)) => rawChildren.dropRight(1) case _ => rawChildren } + + val shouldFailOnError: Boolean = + failOnError.orElse(failOnErrorFromChildren(rawChildren)).getOrElse(true) + val functionName: String = if (shouldFailOnError) { + "parse_url" + } else { + "try_parse_url" + } + val childExprs: Seq[Option[Expr]] = parseUrlArgs.map(exprToProtoInternal(_, inputs, binding)) - val optExpr: Option[Expr] = scalarFunctionExprToProto("parse_url", childExprs: _*) + val optExpr: Option[Expr] = scalarFunctionExprToProto(functionName, childExprs: _*) optExprWithInfo(optExpr, expr, parseUrlArgs: _*) } def convertExpression( expr: Expression, inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - convertInternal(expr, expr.children, inputs, binding) + binding: Boolean, + failOnError: Option[Boolean] = None): Option[Expr] = { + convertInternal(expr, expr.children, failOnError, inputs, binding) } override def convert(expr: ParseUrl, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { - convertInternal(expr, expr.children, inputs, binding) + convertInternal(expr, expr.children, None, inputs, binding) } } diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 1d4427d159..2a461c8afb 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -20,14 +20,15 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke +import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} +import org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringType} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometParseUrl, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} @@ -68,6 +69,18 @@ trait CometExprShim extends CommonStringExprs { val Seq(bin, charset, _, _) = s.arguments stringDecode(expr, charset, bin, inputs, binding) + case i: Invoke if i.functionName == "evaluate" => + i.targetObject match { + case Literal(parseUrlEvaluator: ParseUrlEvaluator, _) => + CometParseUrl.convertExpression( + i, + inputs, + binding, + Some(parseUrlEvaluator.failOnError)) + case _ => + None + } + case expr @ ToPrettyString(child, timeZoneId) => val castSupported = CometCast.isSupported( child.dataType, diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index d99e36c4cd..53dc2eda67 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometStringExpressionSuite extends CometTestBase { @@ -266,6 +267,21 @@ class CometStringExpressionSuite extends CometTestBase { } } + test("parse_url with invalid URL in legacy mode") { + assume(isSpark40Plus) + + withParquetTable( + Seq( + ("http://spark.apache.org/path?query=1", 0), + ("http://spark.apache.org:abc/path", 1), + (null, 2)), + "tbl_parse_url_invalid") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + checkSparkAnswerAndOperator("SELECT parse_url(_1, 'HOST') FROM tbl_parse_url_invalid") + } + } + } + test("Various String scalar functions") { val table = "names" withTable(table) { From e268246334a38c0edb8f7e5ec0303da6fa1c49ba Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Thu, 26 Feb 2026 11:44:50 +0100 Subject: [PATCH 4/8] edit --- .../apache/comet/serde/QueryPlanSerde.scala | 35 +++++++++++- .../org/apache/comet/serde/strings.scala | 57 ++++++++++++++++--- .../apache/comet/shims/CometExprShim.scala | 17 +----- 3 files changed, 85 insertions(+), 24 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0fa4c951be..af6b09ac02 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -44,6 +44,37 @@ import org.apache.comet.shims.CometExprShim */ object QueryPlanSerde extends Logging with CometExprShim { + // Generic serializer contract for Spark Invoke expressions. + private type InvokeConverter = (Expression, Seq[Attribute], Boolean) => Option[Expr] + + // Dispatch table keyed by the runtime class name stored in Invoke target ObjectType. + private val invokeConvertersByTargetClassName: Map[String, InvokeConverter] = Map( + CometParseUrl.invokeTargetClassName -> + ((expr: Expression, inputs: Seq[Attribute], binding: Boolean) => + CometParseUrl.convertExpression(expr, inputs, binding))) + + // Extracts the target object class name from an Invoke-like expression. + private def invokeTargetClassName(expr: Expression): Option[String] = { + expr.children.headOption.flatMap { + _.dataType match { + case objectType: ObjectType => Some(objectType.cls.getName) + case _ => None + } + } + } + + // Routes Invoke expressions to a converter based on the target object class name. + private def convertInvokeExpression( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = + for { + innerObjectTypeExpression <- invokeTargetClassName(expr) + invokeExpr <- invokeConvertersByTargetClassName.get(innerObjectTypeExpression) + expression <- invokeExpr(expr, inputs, binding) + + } yield expression + private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[ArrayAppend] -> CometArrayAppend, classOf[ArrayCompact] -> CometArrayCompact, @@ -558,8 +589,8 @@ object QueryPlanSerde extends Logging with CometExprShim { // `PromotePrecision` is just a wrapper, don't need to serialize it. exprToProtoInternal(child, inputs, binding) - case expr if expr.prettyName == "parse_url" => - CometParseUrl.convertExpression(expr, inputs, binding) + case expr if expr.prettyName == "invoke" => + convertInvokeExpression(expr, inputs, binding) case expr => QueryPlanSerde.exprSerdeMap.get(expr.getClass) match { diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 56a0e2bc9f..7db30beb2d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -22,7 +22,8 @@ package org.apache.comet.serde import java.util.Locale import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, ConcatWs, Expression, If, InitCap, IsNull, Left, Length, Like, Literal, Lower, ParseUrl, RegExpReplace, Right, RLike, StringLPad, StringRepeat, StringRPad, StringSplit, Substring, Upper} -import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, ObjectType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf @@ -383,6 +384,37 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] { } object CometParseUrl extends CometExpressionSerde[ParseUrl] { + val invokeTargetClassName: String = + "org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator" + + private def parseUrlFailOnErrorFromInvoke(expr: Expression): Option[Boolean] = { + expr.children.headOption match { + case Some(Literal(evaluator, objectType: ObjectType)) + if evaluator != null && objectType.cls.getName == invokeTargetClassName => + try { + val failOnErrorMethod = evaluator.getClass.getMethod("failOnError") + Some(failOnErrorMethod.invoke(evaluator).asInstanceOf[Boolean]) + } catch { + case _: ReflectiveOperationException => Some(SQLConf.get.ansiEnabled) + } + case Some(Literal(_, objectType: ObjectType)) + if objectType.cls.getName == invokeTargetClassName => + Some(SQLConf.get.ansiEnabled) + case _ => + None + } + } + + private def dropParseUrlEvaluator(rawChildren: Seq[Expression]): Seq[Expression] = { + rawChildren.headOption match { + case Some(Literal(_, objectType: ObjectType)) + if objectType.cls.getName == invokeTargetClassName => + rawChildren.drop(1) + case _ => + rawChildren + } + } + private def failOnErrorFromChildren(rawChildren: Seq[Expression]): Option[Boolean] = { rawChildren.lastOption.flatMap { case Literal(value: Boolean, _) => Some(value) @@ -397,14 +429,15 @@ object CometParseUrl extends CometExpressionSerde[ParseUrl] { failOnError: Option[Boolean], inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { - val parseUrlArgs: Seq[Expression] = rawChildren.lastOption match { - case Some(Literal(_: Boolean, _)) => rawChildren.dropRight(1) - case Some(Literal(_: java.lang.Boolean, _)) => rawChildren.dropRight(1) - case _ => rawChildren + val sanitizedChildren: Seq[Expression] = dropParseUrlEvaluator(rawChildren) + val parseUrlArgs: Seq[Expression] = sanitizedChildren.lastOption match { + case Some(Literal(_: Boolean, _)) => sanitizedChildren.dropRight(1) + case Some(Literal(_: java.lang.Boolean, _)) => sanitizedChildren.dropRight(1) + case _ => sanitizedChildren } val shouldFailOnError: Boolean = - failOnError.orElse(failOnErrorFromChildren(rawChildren)).getOrElse(true) + failOnError.orElse(failOnErrorFromChildren(sanitizedChildren)).getOrElse(true) val functionName: String = if (shouldFailOnError) { "parse_url" } else { @@ -421,7 +454,17 @@ object CometParseUrl extends CometExpressionSerde[ParseUrl] { inputs: Seq[Attribute], binding: Boolean, failOnError: Option[Boolean] = None): Option[Expr] = { - convertInternal(expr, expr.children, failOnError, inputs, binding) + expr.prettyName match { + case "parse_url" => + convertInternal(expr, expr.children, failOnError, inputs, binding) + case "invoke" => + failOnError + .orElse(parseUrlFailOnErrorFromInvoke(expr)) + .flatMap(inferredFailOnError => + convertInternal(expr, expr.children, Some(inferredFailOnError), inputs, binding)) + case _ => + None + } } override def convert(expr: ParseUrl, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 2a461c8afb..1d4427d159 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -20,15 +20,14 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} -import org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator +import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringType} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometParseUrl, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} @@ -69,18 +68,6 @@ trait CometExprShim extends CommonStringExprs { val Seq(bin, charset, _, _) = s.arguments stringDecode(expr, charset, bin, inputs, binding) - case i: Invoke if i.functionName == "evaluate" => - i.targetObject match { - case Literal(parseUrlEvaluator: ParseUrlEvaluator, _) => - CometParseUrl.convertExpression( - i, - inputs, - binding, - Some(parseUrlEvaluator.failOnError)) - case _ => - None - } - case expr @ ToPrettyString(child, timeZoneId) => val castSupported = CometCast.isSupported( child.dataType, From 7787426193764b69590f71627a3e517585cdf9db Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Wed, 4 Mar 2026 12:57:15 +0100 Subject: [PATCH 5/8] chore: apply spotless formatting --- .../scala/org/apache/comet/CometStringExpressionSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index b888bb6fd8..2304defc03 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -275,9 +275,7 @@ class CometStringExpressionSuite extends CometTestBase { assume(!isSpark40Plus) withParquetTable( - Seq( - ("http://spark.apache.org/path?query=1", 0), - (null, 1)), + Seq(("http://spark.apache.org/path?query=1", 0), (null, 1)), "tbl_parse_url_ansi") { withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { checkSparkAnswerAndOperator("SELECT parse_url(_1, 'HOST') FROM tbl_parse_url_ansi") From 9b114fb7adca0148cc89530dd6dde3962def7e88 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Wed, 4 Mar 2026 13:33:29 +0100 Subject: [PATCH 6/8] test: fix parse_url test suite - Remove try_parse_url SQL queries: try_parse_url is a Comet-internal DataFusion function name used when serializing parse_url with failOnError=false. It is not a registered Spark SQL function and calling it directly via SQL raises UNRESOLVED_ROUTINE on any Spark version. The NULL-on-invalid-URL behaviour is covered by the 'parse_url with invalid URL in legacy mode' Scala test. - Replace ftp port 21 with 2121 in test URLs: the Rust url crate omits well-known default ports (ftp=21) when serialising authority(), while Java URI.getRawAuthority() preserves them verbatim. Using a non-default port avoids this pre-existing semantic gap and keeps the AUTHORITY test case meaningful on both Spark 3.5 and 4.0. --- .../expressions/string/parse_url.sql | 13 +++++------ .../comet/CometStringExpressionSuite.scala | 22 +++++-------------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql b/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql index ddcc02c9b2..7e0f8c5ac9 100644 --- a/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql +++ b/spark/src/test/resources/sql-tests/expressions/string/parse_url.sql @@ -25,7 +25,7 @@ statement INSERT INTO test_parse_url VALUES ('http://spark.apache.org/path?query=1'), ('https://spark.apache.org/path/to/page?query=1&k2=v2'), - ('ftp://user:pwd@ftp.example.com:21/files?x=1#frag'), + ('ftp://user:pwd@ftp.example.com:2121/files?x=1#frag'), (NULL) query @@ -55,10 +55,7 @@ SELECT parse_url(url, 'AUTHORITY') FROM test_parse_url query SELECT parse_url(url, 'USERINFO') FROM test_parse_url --- try_parse_url: same semantics as parse_url in ANSI mode but returns NULL instead of error --- on malformed input; tested here so it runs on both Spark 3.5 and 4.0 -query -SELECT try_parse_url(url, 'HOST') FROM test_parse_url - -query -SELECT try_parse_url(url, 'QUERY', 'x') FROM test_parse_url +-- Note: try_parse_url is a Comet-internal DataFusion function name used when serializing +-- parse_url with failOnError=false. It is not a registered Spark SQL function and cannot +-- be called directly from SQL. The NULL-on-error behaviour is covered by the +-- "parse_url with invalid URL in legacy mode" Scala test. diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 2304defc03..b4a3ec8c43 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -254,7 +254,7 @@ class CometStringExpressionSuite extends CometTestBase { Seq( ("http://spark.apache.org/path?query=1", 0), ("https://spark.apache.org/path/to/page?query=1&k2=v2", 1), - ("ftp://user:pwd@ftp.example.com:21/files?x=1#frag", 2), + ("ftp://user:pwd@ftp.example.com:2121/files?x=1#frag", 2), (null, 3)), "tbl_parse_url") { @@ -298,22 +298,10 @@ class CometStringExpressionSuite extends CometTestBase { } } - test("try_parse_url") { - withParquetTable( - Seq( - ("http://spark.apache.org/path?query=1", 0), - ("https://spark.apache.org/path/to/page?query=1&k2=v2", 1), - ("not_a_valid_url_with_colon://broken:abc/", 2), - (null, 3)), - "tbl_try_parse_url") { - - checkSparkAnswerAndOperator("SELECT try_parse_url(_1, 'HOST') FROM tbl_try_parse_url") - checkSparkAnswerAndOperator("SELECT try_parse_url(_1, 'QUERY') FROM tbl_try_parse_url") - checkSparkAnswerAndOperator("SELECT try_parse_url(_1, 'PROTOCOL') FROM tbl_try_parse_url") - checkSparkAnswerAndOperator( - "SELECT try_parse_url(_1, 'QUERY', 'query') FROM tbl_try_parse_url") - } - } + // Note: try_parse_url is a Comet-internal DataFusion function name used when serializing + // parse_url with failOnError=false. It is not a registered Spark SQL function and cannot + // be called directly from SQL. The NULL-on-error behaviour is covered by the + // "parse_url with invalid URL in legacy mode" test above. test("Various String scalar functions") { val table = "names" From 9d7720fb88c98a40fef1c55d2319c4bf351302e7 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Wed, 4 Mar 2026 13:59:43 +0100 Subject: [PATCH 7/8] refactor: introduce CometInvokeExpressionSerde trait for generic Invoke dispatch --- .../serde/CometInvokeExpressionSerde.scala | 81 +++++++++++++++ .../apache/comet/serde/QueryPlanSerde.scala | 26 +++-- .../org/apache/comet/serde/strings.scala | 99 ++++++++----------- 3 files changed, 132 insertions(+), 74 deletions(-) create mode 100644 spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala diff --git a/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala b/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala new file mode 100644 index 0000000000..1f95c2e2f8 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} + +/** + * Serializer contract for Spark `Invoke` expressions that wrap a private evaluator object. + * + * In Spark 4.0 several built-in functions (e.g. `parse_url`) became `RuntimeReplaceable` and are + * rewritten by the analyser into an `Invoke(evaluator, arg, ...)` node whose first child is a + * `Literal` of `ObjectType` holding a private evaluator instance. The Spark expression class that + * Comet normally dispatches on (e.g. `ParseUrl`) is therefore never seen at serde time on Spark + * 4.0. + * + * Implementors expose: + * - [[invokeTargetClassName]] - the fully-qualified name of the evaluator class embedded in the + * first `Literal(_, ObjectType)` child of the `Invoke` node. This is the key used by + * [[QueryPlanSerde]] to route the node to the correct handler. + * - [[convertFromInvoke]] - the actual serialization logic, receiving the raw `Invoke` + * expression (unerased, with all children including the evaluator literal). + * + * To register a new handler, add the object to [[QueryPlanSerde.invokeSerdeByTargetClassName]]. + */ +trait CometInvokeExpressionSerde[T <: Expression] { + + /** + * Fully-qualified class name of the private evaluator object held in the first child + * `Literal(_, ObjectType(...))` of the `Invoke` node. + * + * Example: `"org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator"` + */ + def invokeTargetClassName: String + + /** + * Serialize the `Invoke` expression into a Comet proto `Expr`. + * + * @param expr + * The raw `Invoke` expression node (first child is the evaluator literal). + * @param inputs + * Resolved input attributes for the enclosing operator. + * @param binding + * Whether attributes are bound (relevant for aggregate expressions). + * @return + * `Some(Expr)` on success, `None` if the expression cannot be handled (the implementor is + * responsible for calling `withInfo` to record the reason). + */ + def convertFromInvoke( + expr: T, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] + + /** + * Unchecked entry point used by [[QueryPlanSerde]] to dispatch through an existential + * `CometInvokeExpressionSerde[_]`. Casts `expr` to `T` before delegating to + * [[convertFromInvoke]]. Only call this when you have already verified (via + * [[invokeTargetClassName]]) that the handler owns this `Invoke` node. + */ + final def convertFromInvokeUnchecked( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = + convertFromInvoke(expr.asInstanceOf[T], inputs, binding) +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e5575b9cb1..c15b51f4fe 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -44,14 +44,13 @@ import org.apache.comet.shims.CometExprShim */ object QueryPlanSerde extends Logging with CometExprShim { - // Generic serializer contract for Spark Invoke expressions. - private type InvokeConverter = (Expression, Seq[Attribute], Boolean) => Option[Expr] - - // Dispatch table keyed by the runtime class name stored in Invoke target ObjectType. - private val invokeConvertersByTargetClassName: Map[String, InvokeConverter] = Map( - CometParseUrl.invokeTargetClassName -> - ((expr: Expression, inputs: Seq[Attribute], binding: Boolean) => - CometParseUrl.convertFromInvoke(expr, inputs, binding))) + // Registry of Invoke-expression handlers, keyed by the fully-qualified class name of the + // evaluator object embedded in the first Literal(_, ObjectType(...)) child of the Invoke node. + // To support a new RuntimeReplaceable expression rewritten to Invoke in Spark 4.0, implement + // CometInvokeExpressionSerde and add the object here. + private val invokeSerdeByTargetClassName + : Map[String, CometInvokeExpressionSerde[_ <: Expression]] = + Seq(CometParseUrl).map(s => s.invokeTargetClassName -> s).toMap // Extracts the target object class name from an Invoke-like expression. private def invokeTargetClassName(expr: Expression): Option[String] = { @@ -63,17 +62,16 @@ object QueryPlanSerde extends Logging with CometExprShim { } } - // Routes Invoke expressions to a converter based on the target object class name. + // Routes Invoke expressions to a handler based on the target object class name. private def convertInvokeExpression( expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = for { - innerObjectTypeExpression <- invokeTargetClassName(expr) - invokeExpr <- invokeConvertersByTargetClassName.get(innerObjectTypeExpression) - expression <- invokeExpr(expr, inputs, binding) - - } yield expression + targetClassName <- invokeTargetClassName(expr) + handler <- invokeSerdeByTargetClassName.get(targetClassName) + result <- handler.convertFromInvokeUnchecked(expr, inputs, binding) + } yield result private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[ArrayAppend] -> CometArrayAppend, diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 440471798e..88051a729e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -383,55 +383,53 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] { } } -object CometParseUrl extends CometExpressionSerde[ParseUrl] { - - // Class name of the Spark 4.0 internal evaluator embedded in the Invoke node that replaces - // ParseUrl at analysis time (RuntimeReplaceable). This constant is used as the dispatch key - // in QueryPlanSerde.invokeConvertersByTargetClassName. - val invokeTargetClassName: String = +object CometParseUrl + extends CometExpressionSerde[ParseUrl] + with CometInvokeExpressionSerde[ParseUrl] { + + // In Spark 4.0, ParseUrl became RuntimeReplaceable and the analyser rewrites it to + // Invoke(ParseUrlEvaluator.evaluate, url, part[, key]). The first child is a + // Literal(evaluator, ObjectType(ParseUrlEvaluator)). This class name is the key + // used by QueryPlanSerde to route the Invoke node to this handler. + override val invokeTargetClassName: String = "org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator" - // --------------------------------------------------------------------------- - // Spark 4.0 Invoke path helpers - // --------------------------------------------------------------------------- - - // Extracts the failOnError flag from the ParseUrlEvaluator instance embedded in the - // Invoke literal. Uses reflection because ParseUrlEvaluator is a private class. - // Falls back to SQLConf.get.ansiEnabled when reflection fails (evaluator is null, or - // the method has been renamed in a future Spark version). - private[serde] def failOnErrorFromInvoke(expr: Expression): Boolean = - expr.children.headOption match { - case Some(Literal(evaluator, objectType: ObjectType)) - if evaluator != null && objectType.cls.getName == invokeTargetClassName => - try { - evaluator.getClass - .getMethod("failOnError") - .invoke(evaluator) - .asInstanceOf[Boolean] - } catch { - case _: ReflectiveOperationException => SQLConf.get.ansiEnabled - } - case _ => - SQLConf.get.ansiEnabled + // Extracts the failOnError flag from the ParseUrlEvaluator instance via reflection. + // Falls back to SQLConf.get.ansiEnabled when reflection is unavailable (null evaluator + // or renamed accessor in a future Spark version). + private def failOnErrorFromEvaluator(evaluator: AnyRef): Boolean = + try { + evaluator.getClass.getMethod("failOnError").invoke(evaluator).asInstanceOf[Boolean] + } catch { + case _: ReflectiveOperationException => SQLConf.get.ansiEnabled } - // Drops the leading ParseUrlEvaluator literal from the Invoke children list, leaving - // only the actual URL/part/key arguments. - private def dropEvaluatorLiteral(children: Seq[Expression]): Seq[Expression] = - children.headOption match { - case Some(Literal(_, objectType: ObjectType)) + override def convertFromInvoke( + expr: ParseUrl, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + // The first child is Literal(evaluator, ObjectType(ParseUrlEvaluator)). + // Strip it and read failOnError from it; the remaining children are (url, part[, key]). + val (urlArgs, failOnError) = expr.children match { + case Literal(evaluator, objectType: ObjectType) +: rest if objectType.cls.getName == invokeTargetClassName => - children.drop(1) - case _ => - children + val foe = + if (evaluator != null) failOnErrorFromEvaluator(evaluator.asInstanceOf[AnyRef]) + else SQLConf.get.ansiEnabled + (rest, foe) + case args => + (args, SQLConf.get.ansiEnabled) } + toProto(expr, urlArgs, failOnError, inputs, binding) + } - // --------------------------------------------------------------------------- - // Core serialization - // --------------------------------------------------------------------------- + // In Spark 3.5, ParseUrl is a concrete expression node with a `failOnError` field + // that is directly accessible without reflection. + override def convert(expr: ParseUrl, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = + toProto(expr, expr.children, expr.failOnError, inputs, binding) - // Converts the parse_url/try_parse_url arguments into a proto Expr. - // `urlArgs` must already be stripped of the evaluator literal and the failOnError flag literal. + // Serializes (url, part[, key]) arguments into the appropriate native function call. + // Uses parse_url (ANSI/strict) or try_parse_url (legacy/lenient) depending on failOnError. private def toProto( expr: Expression, urlArgs: Seq[Expression], @@ -443,25 +441,6 @@ object CometParseUrl extends CometExpressionSerde[ParseUrl] { val optExpr = scalarFunctionExprToProto(functionName, childExprs: _*) optExprWithInfo(optExpr, expr, urlArgs: _*) } - - // --------------------------------------------------------------------------- - // Public entry points - // --------------------------------------------------------------------------- - - // Spark 3.5 path: ParseUrl is a concrete expression node with a `failOnError` field. - override def convert(expr: ParseUrl, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = - toProto(expr, expr.children, expr.failOnError, inputs, binding) - - // Spark 4.0 path: ParseUrl is replaced by Invoke(ParseUrlEvaluator, url, part[, key]). - // Called from QueryPlanSerde.convertInvokeExpression via invokeConvertersByTargetClassName. - def convertFromInvoke( - expr: Expression, - inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - val failOnError = failOnErrorFromInvoke(expr) - val urlArgs = dropEvaluatorLiteral(expr.children) - toProto(expr, urlArgs, failOnError, inputs, binding) - } } trait CommonStringExprs { From 3b07a93c352ba5d779663206be34c6e4b48c530e Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Thu, 5 Mar 2026 18:46:56 +0100 Subject: [PATCH 8/8] refactor: simplify CometInvokeExpressionSerde by removing type parameter Use Expression directly instead of a generic type parameter T <: Expression. This eliminates the asInstanceOf cast in convertFromInvokeUnchecked and the unchecked method itself, since QueryPlanSerde can now call convertFromInvoke directly without existential type issues. --- .../serde/CometInvokeExpressionSerde.scala | 18 +++--------------- .../apache/comet/serde/QueryPlanSerde.scala | 4 ++-- .../scala/org/apache/comet/serde/strings.scala | 4 ++-- 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala b/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala index 1f95c2e2f8..7ab82bfa6e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometInvokeExpressionSerde.scala @@ -35,11 +35,11 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} * first `Literal(_, ObjectType)` child of the `Invoke` node. This is the key used by * [[QueryPlanSerde]] to route the node to the correct handler. * - [[convertFromInvoke]] - the actual serialization logic, receiving the raw `Invoke` - * expression (unerased, with all children including the evaluator literal). + * expression (with all children including the evaluator literal). * * To register a new handler, add the object to [[QueryPlanSerde.invokeSerdeByTargetClassName]]. */ -trait CometInvokeExpressionSerde[T <: Expression] { +trait CometInvokeExpressionSerde { /** * Fully-qualified class name of the private evaluator object held in the first child @@ -63,19 +63,7 @@ trait CometInvokeExpressionSerde[T <: Expression] { * responsible for calling `withInfo` to record the reason). */ def convertFromInvoke( - expr: T, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] - - /** - * Unchecked entry point used by [[QueryPlanSerde]] to dispatch through an existential - * `CometInvokeExpressionSerde[_]`. Casts `expr` to `T` before delegating to - * [[convertFromInvoke]]. Only call this when you have already verified (via - * [[invokeTargetClassName]]) that the handler owns this `Invoke` node. - */ - final def convertFromInvokeUnchecked( expr: Expression, inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = - convertFromInvoke(expr.asInstanceOf[T], inputs, binding) + binding: Boolean): Option[ExprOuterClass.Expr] } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c15b51f4fe..174e358965 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -49,7 +49,7 @@ object QueryPlanSerde extends Logging with CometExprShim { // To support a new RuntimeReplaceable expression rewritten to Invoke in Spark 4.0, implement // CometInvokeExpressionSerde and add the object here. private val invokeSerdeByTargetClassName - : Map[String, CometInvokeExpressionSerde[_ <: Expression]] = + : Map[String, CometInvokeExpressionSerde] = Seq(CometParseUrl).map(s => s.invokeTargetClassName -> s).toMap // Extracts the target object class name from an Invoke-like expression. @@ -70,7 +70,7 @@ object QueryPlanSerde extends Logging with CometExprShim { for { targetClassName <- invokeTargetClassName(expr) handler <- invokeSerdeByTargetClassName.get(targetClassName) - result <- handler.convertFromInvokeUnchecked(expr, inputs, binding) + result <- handler.convertFromInvoke(expr, inputs, binding) } yield result private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 88051a729e..44de28cce5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -385,7 +385,7 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] { object CometParseUrl extends CometExpressionSerde[ParseUrl] - with CometInvokeExpressionSerde[ParseUrl] { + with CometInvokeExpressionSerde { // In Spark 4.0, ParseUrl became RuntimeReplaceable and the analyser rewrites it to // Invoke(ParseUrlEvaluator.evaluate, url, part[, key]). The first child is a @@ -405,7 +405,7 @@ object CometParseUrl } override def convertFromInvoke( - expr: ParseUrl, + expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { // The first child is Literal(evaluator, ObjectType(ParseUrlEvaluator)).