From 288a2f56140b5534ee989fd22f5d980819581db0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 07:52:04 -0600 Subject: [PATCH 01/12] refactor arithmetic serde --- .../apache/comet/serde/QueryPlanSerde.scala | 199 +------------ .../org/apache/comet/serde/arithmetic.scala | 261 ++++++++++++++++++ 2 files changed, 267 insertions(+), 193 deletions(-) create mode 100644 spark/src/main/scala/org/apache/comet/serde/arithmetic.scala diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 43c03d6b4f..8804b5ac2a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -23,7 +23,6 @@ import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -import scala.math.min import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ @@ -68,6 +67,12 @@ object QueryPlanSerde extends Logging with CometExprShim { * Mapping of Spark expression class to Comet expression handler. */ private val exprSerdeMap: Map[Class[_], CometExpressionSerde] = Map( + classOf[Add] -> CometAdd, + classOf[Subtract] -> CometSubtract, + classOf[Multiply] -> CometMultiply, + classOf[Divide] -> CometDivide, + classOf[IntegralDivide] -> CometIntegralDivide, + classOf[Remainder] -> CometRemainder, classOf[ArrayAppend] -> CometArrayAppend, classOf[ArrayContains] -> CometArrayContains, classOf[ArrayDistinct] -> CometArrayDistinct, @@ -631,143 +636,6 @@ object QueryPlanSerde extends Logging with CometExprShim { case c @ Cast(child, dt, timeZoneId, _) => handleCast(expr, child, inputs, binding, dt, timeZoneId, evalMode(c)) - case add @ Add(left, right, _) if supportedDataType(left.dataType) => - createMathExpression( - expr, - left, - right, - inputs, - binding, - add.dataType, - add.evalMode == EvalMode.ANSI, - (builder, mathExpr) => builder.setAdd(mathExpr)) - - case add @ Add(left, _, _) if !supportedDataType(left.dataType) => - withInfo(add, s"Unsupported datatype ${left.dataType}") - None - - case sub @ Subtract(left, right, _) if supportedDataType(left.dataType) => - createMathExpression( - expr, - left, - right, - inputs, - binding, - sub.dataType, - sub.evalMode == EvalMode.ANSI, - (builder, mathExpr) => builder.setSubtract(mathExpr)) - - case sub @ Subtract(left, _, _) if !supportedDataType(left.dataType) => - withInfo(sub, s"Unsupported datatype ${left.dataType}") - None - - case mul @ Multiply(left, right, _) if supportedDataType(left.dataType) => - createMathExpression( - expr, - left, - right, - inputs, - binding, - mul.dataType, - mul.evalMode == EvalMode.ANSI, - (builder, mathExpr) => builder.setMultiply(mathExpr)) - - case mul @ Multiply(left, _, _) => - if (!supportedDataType(left.dataType)) { - withInfo(mul, s"Unsupported datatype ${left.dataType}") - } - None - - case div @ Divide(left, right, _) if supportedDataType(left.dataType) => - // Datafusion now throws an exception for dividing by zero - // See https://github.com/apache/arrow-datafusion/pull/6792 - // For now, use NullIf to swap zeros with nulls. - val rightExpr = nullIfWhenPrimitive(right) - - createMathExpression( - expr, - left, - rightExpr, - inputs, - binding, - div.dataType, - div.evalMode == EvalMode.ANSI, - (builder, mathExpr) => builder.setDivide(mathExpr)) - - case div @ Divide(left, _, _) => - if (!supportedDataType(left.dataType)) { - withInfo(div, s"Unsupported datatype ${left.dataType}") - } - None - - case div @ IntegralDivide(left, right, _) if supportedDataType(left.dataType) => - val rightExpr = nullIfWhenPrimitive(right) - - val dataType = (left.dataType, right.dataType) match { - case (l: DecimalType, r: DecimalType) => - // copy from IntegralDivide.resultDecimalType - val intDig = l.precision - l.scale + r.scale - DecimalType(min(if (intDig == 0) 1 else intDig, DecimalType.MAX_PRECISION), 0) - case _ => left.dataType - } - - val divideExpr = createMathExpression( - expr, - left, - rightExpr, - inputs, - binding, - dataType, - div.evalMode == EvalMode.ANSI, - (builder, mathExpr) => builder.setIntegralDivide(mathExpr)) - - if (divideExpr.isDefined) { - val childExpr = if (dataType.isInstanceOf[DecimalType]) { - // check overflow for decimal type - val builder = ExprOuterClass.CheckOverflow.newBuilder() - builder.setChild(divideExpr.get) - builder.setFailOnError(div.evalMode == EvalMode.ANSI) - builder.setDatatype(serializeDataType(dataType).get) - Some( - ExprOuterClass.Expr - .newBuilder() - .setCheckOverflow(builder) - .build()) - } else { - divideExpr - } - - // cast result to long - castToProto(expr, None, LongType, childExpr.get, CometEvalMode.LEGACY) - } else { - None - } - - case div @ IntegralDivide(left, _, _) => - if (!supportedDataType(left.dataType)) { - withInfo(div, s"Unsupported datatype ${left.dataType}") - } - None - - case rem @ Remainder(left, right, _) if supportedDataType(left.dataType) => - val rightExpr = nullIfWhenPrimitive(right) - - createMathExpression( - expr, - left, - rightExpr, - inputs, - binding, - rem.dataType, - rem.evalMode == EvalMode.ANSI, - (builder, mathExpr) => builder.setRemainder(mathExpr)) - - case rem @ Remainder(left, _, _) => - if (!supportedDataType(left.dataType)) { - withInfo(rem, s"Unsupported datatype ${left.dataType}") - } - None - case EqualTo(left, right) => createBinaryExpr( expr, @@ -1950,42 +1818,6 @@ object QueryPlanSerde extends Logging with CometExprShim { } } - private def createMathExpression( - expr: Expression, - left: Expression, - right: Expression, - inputs: Seq[Attribute], - binding: Boolean, - dataType: DataType, - failOnError: Boolean, - f: (ExprOuterClass.Expr.Builder, ExprOuterClass.MathExpr) => ExprOuterClass.Expr.Builder) - : Option[ExprOuterClass.Expr] = { - val leftExpr = exprToProtoInternal(left, inputs, binding) - val rightExpr = exprToProtoInternal(right, inputs, binding) - - if (leftExpr.isDefined && rightExpr.isDefined) { - // create the generic MathExpr message - val builder = ExprOuterClass.MathExpr.newBuilder() - builder.setLeft(leftExpr.get) - builder.setRight(rightExpr.get) - builder.setFailOnError(failOnError) - serializeDataType(dataType).foreach { t => - builder.setReturnType(t) - } - val inner = builder.build() - // call the user-supplied function to wrap MathExpr in a top-level Expr - // such as Expr.Add or Expr.Divide - Some( - f( - ExprOuterClass.Expr - .newBuilder(), - inner).build()) - } else { - withInfo(expr, left, right) - None - } - } - def in( expr: Expression, value: Expression, @@ -2041,25 +1873,6 @@ object QueryPlanSerde extends Logging with CometExprShim { Some(ExprOuterClass.Expr.newBuilder().setScalarFunc(builder).build()) } - private def isPrimitive(expression: Expression): Boolean = expression.dataType match { - case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: TimestampType | _: DateType | _: BooleanType | _: DecimalType => - true - case _ => false - } - - private def nullIfWhenPrimitive(expression: Expression): Expression = - if (isPrimitive(expression)) { - val zero = Literal.default(expression.dataType) - expression match { - case _: Literal if expression != zero => expression - case _ => - If(EqualTo(expression, zero), Literal.create(null, expression.dataType), expression) - } - } else { - expression - } - private def nullIfNegative(expression: Expression): Expression = { val zero = Literal.default(expression.dataType) If(LessThanOrEqual(expression, zero), Literal.create(null, expression.dataType), expression) diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala new file mode 100644 index 0000000000..a009695c69 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import scala.math.min + +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, Divide, EqualTo, EvalMode, Expression, If, IntegralDivide, Literal, Multiply, Remainder, Subtract} +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, TimestampType} + +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.QueryPlanSerde.{castToProto, exprToProtoInternal, serializeDataType, supportedDataType} + +trait MathBase { + def createMathExpression( + expr: Expression, + left: Expression, + right: Expression, + inputs: Seq[Attribute], + binding: Boolean, + dataType: DataType, + failOnError: Boolean, + f: (ExprOuterClass.Expr.Builder, ExprOuterClass.MathExpr) => ExprOuterClass.Expr.Builder) + : Option[ExprOuterClass.Expr] = { + val leftExpr = exprToProtoInternal(left, inputs, binding) + val rightExpr = exprToProtoInternal(right, inputs, binding) + + if (leftExpr.isDefined && rightExpr.isDefined) { + // create the generic MathExpr message + val builder = ExprOuterClass.MathExpr.newBuilder() + builder.setLeft(leftExpr.get) + builder.setRight(rightExpr.get) + builder.setFailOnError(failOnError) + serializeDataType(dataType).foreach { t => + builder.setReturnType(t) + } + val inner = builder.build() + // call the user-supplied function to wrap MathExpr in a top-level Expr + // such as Expr.Add or Expr.Divide + Some( + f( + ExprOuterClass.Expr + .newBuilder(), + inner).build()) + } else { + withInfo(expr, left, right) + None + } + } + + def nullIfWhenPrimitive(expression: Expression): Expression = + if (isPrimitive(expression)) { + val zero = Literal.default(expression.dataType) + expression match { + case _: Literal if expression != zero => expression + case _ => + If(EqualTo(expression, zero), Literal.create(null, expression.dataType), expression) + } + } else { + expression + } + + private def isPrimitive(expression: Expression): Boolean = expression.dataType match { + case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | + _: DoubleType | _: TimestampType | _: DateType | _: BooleanType | _: DecimalType => + true + case _ => false + } + +} + +object CometAdd extends CometExpressionSerde with MathBase { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val add = expr.asInstanceOf[Add] + if (!supportedDataType(add.left.dataType)) { + withInfo(add, s"Unsupported datatype ${add.left.dataType}") + return None + } + createMathExpression( + expr, + add.left, + add.right, + inputs, + binding, + add.dataType, + add.evalMode == EvalMode.ANSI, + (builder, mathExpr) => builder.setAdd(mathExpr)) + } +} + +object CometSubtract extends CometExpressionSerde with MathBase { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val add = expr.asInstanceOf[Subtract] + if (!supportedDataType(add.left.dataType)) { + withInfo(add, s"Unsupported datatype ${add.left.dataType}") + return None + } + createMathExpression( + expr, + add.left, + add.right, + inputs, + binding, + add.dataType, + add.evalMode == EvalMode.ANSI, + (builder, mathExpr) => builder.setSubtract(mathExpr)) + } +} + +object CometMultiply extends CometExpressionSerde with MathBase { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val mul = expr.asInstanceOf[Multiply] + if (!supportedDataType(mul.left.dataType)) { + withInfo(mul, s"Unsupported datatype ${mul.left.dataType}") + return None + } + createMathExpression( + expr, + mul.left, + mul.right, + inputs, + binding, + mul.dataType, + mul.evalMode == EvalMode.ANSI, + (builder, mathExpr) => builder.setMultiply(mathExpr)) + } +} + +object CometDivide extends CometExpressionSerde with MathBase { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val div = expr.asInstanceOf[Divide] + + // Datafusion now throws an exception for dividing by zero + // See https://github.com/apache/arrow-datafusion/pull/6792 + // For now, use NullIf to swap zeros with nulls. + val rightExpr = nullIfWhenPrimitive(div.right) + + if (!supportedDataType(div.left.dataType)) { + withInfo(div, s"Unsupported datatype ${div.left.dataType}") + return None + } + createMathExpression( + expr, + div.left, + rightExpr, + inputs, + binding, + div.dataType, + div.evalMode == EvalMode.ANSI, + (builder, mathExpr) => builder.setDivide(mathExpr)) + } +} + +object CometIntegralDivide extends CometExpressionSerde with MathBase { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val div = expr.asInstanceOf[IntegralDivide] + val rightExpr = nullIfWhenPrimitive(div.right) + + if (!supportedDataType(div.left.dataType)) { + withInfo(div, s"Unsupported datatype ${div.left.dataType}") + return None + } + + val dataType = (div.left.dataType, div.right.dataType) match { + case (l: DecimalType, r: DecimalType) => + // copy from IntegralDivide.resultDecimalType + val intDig = l.precision - l.scale + r.scale + DecimalType(min(if (intDig == 0) 1 else intDig, DecimalType.MAX_PRECISION), 0) + case _ => div.left.dataType + } + + val divideExpr = createMathExpression( + expr, + div.left, + rightExpr, + inputs, + binding, + div.dataType, + div.evalMode == EvalMode.ANSI, + (builder, mathExpr) => builder.setIntegralDivide(mathExpr)) + + if (divideExpr.isDefined) { + val childExpr = if (dataType.isInstanceOf[DecimalType]) { + // check overflow for decimal type + val builder = ExprOuterClass.CheckOverflow.newBuilder() + builder.setChild(divideExpr.get) + builder.setFailOnError(div.evalMode == EvalMode.ANSI) + builder.setDatatype(serializeDataType(dataType).get) + Some( + ExprOuterClass.Expr + .newBuilder() + .setCheckOverflow(builder) + .build()) + } else { + divideExpr + } + + // cast result to long + castToProto(expr, None, LongType, childExpr.get, CometEvalMode.LEGACY) + } else { + None + } + } +} + +object CometRemainder extends CometExpressionSerde with MathBase { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val remainder = expr.asInstanceOf[Remainder] + if (!supportedDataType(remainder.left.dataType)) { + withInfo(remainder, s"Unsupported datatype ${remainder.left.dataType}") + return None + } + + val rightExpr = nullIfWhenPrimitive(remainder) + + createMathExpression( + expr, + remainder.left, + rightExpr, + inputs, + binding, + remainder.dataType, + remainder.evalMode == EvalMode.ANSI, + (builder, mathExpr) => builder.setRemainder(mathExpr)) + } +} From 0fb92fc2e7d30bd3bc17d1710c079b2369710e34 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 08:23:43 -0600 Subject: [PATCH 02/12] fix --- .../org/apache/comet/serde/arithmetic.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index a009695c69..409756a5d6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -113,19 +113,19 @@ object CometSubtract extends CometExpressionSerde with MathBase { expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val add = expr.asInstanceOf[Subtract] - if (!supportedDataType(add.left.dataType)) { - withInfo(add, s"Unsupported datatype ${add.left.dataType}") + val sub = expr.asInstanceOf[Subtract] + if (!supportedDataType(sub.left.dataType)) { + withInfo(sub, s"Unsupported datatype ${sub.left.dataType}") return None } createMathExpression( expr, - add.left, - add.right, + sub.left, + sub.right, inputs, binding, - add.dataType, - add.evalMode == EvalMode.ANSI, + sub.dataType, + sub.evalMode == EvalMode.ANSI, (builder, mathExpr) => builder.setSubtract(mathExpr)) } } @@ -246,7 +246,7 @@ object CometRemainder extends CometExpressionSerde with MathBase { return None } - val rightExpr = nullIfWhenPrimitive(remainder) + val rightExpr = nullIfWhenPrimitive(remainder.right) createMathExpression( expr, From b1b085221e1468643f87de929d1e81669e400c2f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 11:16:01 -0600 Subject: [PATCH 03/12] fix --- spark/src/main/scala/org/apache/comet/serde/arithmetic.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index 409756a5d6..f88e798644 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -207,7 +207,7 @@ object CometIntegralDivide extends CometExpressionSerde with MathBase { rightExpr, inputs, binding, - div.dataType, + dataType, div.evalMode == EvalMode.ANSI, (builder, mathExpr) => builder.setIntegralDivide(mathExpr)) From 7f73c704d6b184f206d92c60c9612ae610541a81 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 12:29:33 -0600 Subject: [PATCH 04/12] fall back to Spark for TRY and ANSI eval modes --- native/core/src/execution/planner.rs | 120 +++++++++++------- native/proto/src/proto/expr.proto | 14 +- .../org/apache/comet/serde/arithmetic.scala | 43 +++++-- .../apache/comet/CometExpressionSuite.scala | 12 +- 4 files changed, 127 insertions(+), 62 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 7003b2651b..e69d6c1a61 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -230,51 +230,81 @@ impl PhysicalPlanner { input_schema: SchemaRef, ) -> Result, ExecutionError> { match spark_expr.expr_struct.as_ref().unwrap() { - ExprStruct::Add(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Plus, - input_schema, - ), - ExprStruct::Subtract(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Minus, - input_schema, - ), - ExprStruct::Multiply(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Multiply, - input_schema, - ), - ExprStruct::Divide(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - ), - ExprStruct::IntegralDivide(expr) => self.create_binary_expr_with_options( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - BinaryExprOptions { - is_integral_div: true, - }, - ), - ExprStruct::Remainder(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Modulo, - input_schema, - ), + ExprStruct::Add(expr) => { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + // TODO add support for other eval modes + assert!(eval_mode == EvalMode::Legacy); + self.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Plus, + input_schema, + ) + } + ExprStruct::Subtract(expr) => { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + // TODO add support for other eval modes + assert!(eval_mode == EvalMode::Legacy); + self.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Minus, + input_schema, + ) + } + ExprStruct::Multiply(expr) => { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + // TODO add support for other eval modes + assert!(eval_mode == EvalMode::Legacy); + self.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Multiply, + input_schema, + ) + } + ExprStruct::Divide(expr) => { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + // TODO add support for other eval modes + assert!(eval_mode == EvalMode::Legacy); + self.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Divide, + input_schema, + ) + } + ExprStruct::IntegralDivide(expr) => { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + // TODO add support for other eval modes + assert!(eval_mode == EvalMode::Legacy); + self.create_binary_expr_with_options( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Divide, + input_schema, + BinaryExprOptions { + is_integral_div: true, + }, + ) + } + ExprStruct::Remainder(expr) => { + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + // TODO add support for other eval modes + assert!(eval_mode == EvalMode::Legacy); + self.create_binary_expr( + expr.left.as_ref().unwrap(), + expr.right.as_ref().unwrap(), + expr.return_type.as_ref(), + DataFusionOperator::Modulo, + input_schema, + ) + } ExprStruct::Eq(expr) => { let left = self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 8f4c875eec..3daa304736 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -219,19 +219,19 @@ message Literal { bool is_null = 12; } -message MathExpr { - Expr left = 1; - Expr right = 2; - bool fail_on_error = 3; - DataType return_type = 4; -} - enum EvalMode { LEGACY = 0; TRY = 1; ANSI = 2; } +message MathExpr { + Expr left = 1; + Expr right = 2; + DataType return_type = 4; + EvalMode eval_mode = 5; +} + message Cast { Expr child = 1; DataType datatype = 2; diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index f88e798644..b7abfd7c94 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, De import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.QueryPlanSerde.{castToProto, exprToProtoInternal, serializeDataType, supportedDataType} +import org.apache.comet.serde.QueryPlanSerde.{castToProto, evalModeToProto, exprToProtoInternal, serializeDataType, supportedDataType} +import org.apache.comet.shims.CometEvalModeUtil trait MathBase { def createMathExpression( @@ -36,7 +37,7 @@ trait MathBase { inputs: Seq[Attribute], binding: Boolean, dataType: DataType, - failOnError: Boolean, + evalMode: EvalMode.Value, f: (ExprOuterClass.Expr.Builder, ExprOuterClass.MathExpr) => ExprOuterClass.Expr.Builder) : Option[ExprOuterClass.Expr] = { val leftExpr = exprToProtoInternal(left, inputs, binding) @@ -47,7 +48,7 @@ trait MathBase { val builder = ExprOuterClass.MathExpr.newBuilder() builder.setLeft(leftExpr.get) builder.setRight(rightExpr.get) - builder.setFailOnError(failOnError) + builder.setEvalMode(evalModeToProto(CometEvalModeUtil.fromSparkEvalMode(evalMode))) serializeDataType(dataType).foreach { t => builder.setReturnType(t) } @@ -96,6 +97,10 @@ object CometAdd extends CometExpressionSerde with MathBase { withInfo(add, s"Unsupported datatype ${add.left.dataType}") return None } + if (add.evalMode != EvalMode.LEGACY) { + withInfo(add, s"Eval mode ${add.evalMode} is not supported") + return None + } createMathExpression( expr, add.left, @@ -103,7 +108,7 @@ object CometAdd extends CometExpressionSerde with MathBase { inputs, binding, add.dataType, - add.evalMode == EvalMode.ANSI, + add.evalMode, (builder, mathExpr) => builder.setAdd(mathExpr)) } } @@ -118,6 +123,10 @@ object CometSubtract extends CometExpressionSerde with MathBase { withInfo(sub, s"Unsupported datatype ${sub.left.dataType}") return None } + if (sub.evalMode != EvalMode.LEGACY) { + withInfo(sub, s"Eval mode ${sub.evalMode} is not supported") + return None + } createMathExpression( expr, sub.left, @@ -125,7 +134,7 @@ object CometSubtract extends CometExpressionSerde with MathBase { inputs, binding, sub.dataType, - sub.evalMode == EvalMode.ANSI, + sub.evalMode, (builder, mathExpr) => builder.setSubtract(mathExpr)) } } @@ -140,6 +149,10 @@ object CometMultiply extends CometExpressionSerde with MathBase { withInfo(mul, s"Unsupported datatype ${mul.left.dataType}") return None } + if (mul.evalMode != EvalMode.LEGACY) { + withInfo(mul, s"Eval mode ${mul.evalMode} is not supported") + return None + } createMathExpression( expr, mul.left, @@ -147,7 +160,7 @@ object CometMultiply extends CometExpressionSerde with MathBase { inputs, binding, mul.dataType, - mul.evalMode == EvalMode.ANSI, + mul.evalMode, (builder, mathExpr) => builder.setMultiply(mathExpr)) } } @@ -168,6 +181,10 @@ object CometDivide extends CometExpressionSerde with MathBase { withInfo(div, s"Unsupported datatype ${div.left.dataType}") return None } + if (div.evalMode != EvalMode.LEGACY) { + withInfo(div, s"Eval mode ${div.evalMode} is not supported") + return None + } createMathExpression( expr, div.left, @@ -175,7 +192,7 @@ object CometDivide extends CometExpressionSerde with MathBase { inputs, binding, div.dataType, - div.evalMode == EvalMode.ANSI, + div.evalMode, (builder, mathExpr) => builder.setDivide(mathExpr)) } } @@ -192,6 +209,10 @@ object CometIntegralDivide extends CometExpressionSerde with MathBase { withInfo(div, s"Unsupported datatype ${div.left.dataType}") return None } + if (div.evalMode != EvalMode.LEGACY) { + withInfo(div, s"Eval mode ${div.evalMode} is not supported") + return None + } val dataType = (div.left.dataType, div.right.dataType) match { case (l: DecimalType, r: DecimalType) => @@ -208,7 +229,7 @@ object CometIntegralDivide extends CometExpressionSerde with MathBase { inputs, binding, dataType, - div.evalMode == EvalMode.ANSI, + div.evalMode, (builder, mathExpr) => builder.setIntegralDivide(mathExpr)) if (divideExpr.isDefined) { @@ -245,6 +266,10 @@ object CometRemainder extends CometExpressionSerde with MathBase { withInfo(remainder, s"Unsupported datatype ${remainder.left.dataType}") return None } + if (remainder.evalMode != EvalMode.LEGACY) { + withInfo(remainder, s"Eval mode ${remainder.evalMode} is not supported") + return None + } val rightExpr = nullIfWhenPrimitive(remainder.right) @@ -255,7 +280,7 @@ object CometRemainder extends CometExpressionSerde with MathBase { inputs, binding, remainder.dataType, - remainder.evalMode == EvalMode.ANSI, + remainder.evalMode, (builder, mathExpr) => builder.setRemainder(mathExpr)) } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index a30d25f87e..9725313202 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -302,10 +302,20 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("try_add") { + // TODO: we need to implement more comprehensive tests for all try_ arithmetic functions + // https://github.com/apache/datafusion-comet/issues/2021 + val data = Seq((Integer.MAX_VALUE, 1)) + withParquetTable(data, "tbl") { + checkSparkAnswer("SELECT try_add(_1, _2) FROM tbl") + } + } + test("dictionary arithmetic") { // TODO: test ANSI mode withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { - withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { + val data = Seq((Integer.MAX_VALUE, 1)) + withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT _1 + _2, _1 - _2, _1 * _2, _1 / _2, _1 % _2 FROM tbl") } } From bbd6cd929daa7650102680b37b426a30ce50e048 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 12:33:58 -0600 Subject: [PATCH 05/12] fix merge conflict --- native/core/src/execution/planner.rs | 53 ++-------------------------- 1 file changed, 2 insertions(+), 51 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 77bc50420c..093f3e5060 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -229,7 +229,6 @@ impl PhysicalPlanner { input_schema: SchemaRef, ) -> Result, ExecutionError> { match spark_expr.expr_struct.as_ref().unwrap() { -<<<<<<< HEAD ExprStruct::Add(expr) => { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; // TODO add support for other eval modes @@ -296,54 +295,7 @@ impl PhysicalPlanner { ExprStruct::Remainder(expr) => { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; // TODO add support for other eval modes - assert!(eval_mode == EvalMode::Legacy); - self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Modulo, - input_schema, - ) -======= - ExprStruct::Add(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Plus, - input_schema, - ), - ExprStruct::Subtract(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Minus, - input_schema, - ), - ExprStruct::Multiply(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Multiply, - input_schema, - ), - ExprStruct::Divide(expr) => self.create_binary_expr( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - ), - ExprStruct::IntegralDivide(expr) => self.create_binary_expr_with_options( - expr.left.as_ref().unwrap(), - expr.right.as_ref().unwrap(), - expr.return_type.as_ref(), - DataFusionOperator::Divide, - input_schema, - BinaryExprOptions { - is_integral_div: true, - }, - ), - ExprStruct::Remainder(expr) => { + assert!(eval_mode == EvalMode::Try); let left = self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = @@ -354,11 +306,10 @@ impl PhysicalPlanner { right, expr.return_type.as_ref().map(to_arrow_datatype).unwrap(), input_schema, - expr.fail_on_error, + eval_mode == EvalMode::Ansi, &self.session_ctx.state(), ); result.map_err(|e| GeneralError(e.to_string())) ->>>>>>> apache/main } ExprStruct::Eq(expr) => { let left = From a550435feb1cb6803967c13c39a011c116caffd6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 12:54:06 -0600 Subject: [PATCH 06/12] fix --- docs/source/user-guide/compatibility.md | 228 +++++++++--------- native/core/src/execution/planner.rs | 4 +- .../org/apache/comet/serde/arithmetic.scala | 14 +- .../apache/comet/CometExpressionSuite.scala | 3 +- 4 files changed, 130 insertions(+), 119 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 84c4aab0e9..5f0842eb4d 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -38,19 +38,15 @@ Comet does not support reading decimals encoded in binary format. ### Parquet Scans Comet currently has three distinct implementations of the Parquet scan operator. The configuration property -`spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, and -Comet will choose the most appropriate implementation based on the Parquet schema and other Comet configuration -settings. Most users should not need to change this setting. However, it is possible to force Comet to try and use -a particular implementation for all scan operations by setting this configuration property to one of the following -implementations. - -| Implementation | Description | -| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| `native_comet` | This implementation provides strong compatibility with Spark but does not support complex types. This is the original scan implementation in Comet and may eventually be removed. | -| `native_iceberg_compat` | This implementation delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | -| `native_datafusion` | This experimental implementation delegates to DataFusion's `DataSourceExec` for full native execution. There are known compatibility issues when using this scan. | - -The `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` +`spark.comet.scan.impl` is used to select an implementation. + +| Implementation | Description | +| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `native_comet` | This is the default implementation. It provides strong compatibility with Spark but does not support complex types. | +| `native_datafusion` | This implementation delegates to DataFusion's `DataSourceExec`. | +| `native_iceberg_compat` | This implementation also delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | + +The new (and currently experimental) `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` implementation: - Leverages the DataFusion community's ongoing improvements to `DataSourceExec` @@ -58,28 +54,36 @@ implementation: - Removes the use of reusable mutable-buffers in Comet, which is complex to maintain - Improves performance -The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: +The new scans currently have the following limitations: + +Issues common to both `native_datafusion` and `native_iceberg_compat`: - When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` - or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these - logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned - rather than signed. By default, Comet will fall back to `native_comet` when scanning Parquet files containing `byte` or `short` - types (regardless of the logical type). This behavior can be disabled by setting - `spark.comet.scan.allowIncompatible=true`. +or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these +logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned +rather than signed. By default, Comet will fall back to Spark when scanning Parquet files containing `byte` or `short` +types (regardless of the logical type). This behavior can be disabled by setting +`spark.comet.scan.allowIncompatible=true`. +- There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more +information. +- Reading maps containing complex types can result in errors or incorrect results [#1754] +- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] +- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]). - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. +- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan. -The `native_datafusion` scan has some additional limitations: +Issues specific to `native_datafusion`: - Bucketed scans are not supported - No support for row indexes -- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] -- There are failures in the Spark SQL test suite [#1545] -- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with Spark [#1545]: https://github.com/apache/datafusion-comet/issues/1545 +[#1542]: https://github.com/apache/datafusion-comet/issues/1542 +[#1754]: https://github.com/apache/datafusion-comet/issues/1754 [#1758]: https://github.com/apache/datafusion-comet/issues/1758 +[Comet Tuning Guide]: tuning.md -## ANSI Mode +## ANSI mode Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default, Comet will fall back to Spark if ANSI mode is enabled. To enable Comet to accelerate queries when ANSI mode is enabled, @@ -88,7 +92,7 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. -## Floating-point Number Comparison +## Floating number comparison Spark normalizes NaN and zero for floating point numbers for several cases. See `NormalizeFloatingNumbers` optimization rule in Spark. However, one exception is comparison. Spark does not normalize NaN and zero when comparing values @@ -121,104 +125,104 @@ Cast operations in Comet fall into three levels of support: - **Compatible**: The results match Apache Spark - **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs - will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting - `spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not - recommended for production use. +will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting +`spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not +recommended for production use. - **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to - Spark. +Spark. ### Compatible Casts The following cast operations are generally compatible with Spark except for the differences noted here. -| From Type | To Type | Notes | -| --------- | ------- | --------------------------------------------------------------------------------------------------------------- | -| boolean | byte | | -| boolean | short | | -| boolean | integer | | -| boolean | long | | -| boolean | float | | -| boolean | double | | -| boolean | string | | -| byte | boolean | | -| byte | short | | -| byte | integer | | -| byte | long | | -| byte | float | | -| byte | double | | -| byte | decimal | | -| byte | string | | -| short | boolean | | -| short | byte | | -| short | integer | | -| short | long | | -| short | float | | -| short | double | | -| short | decimal | | -| short | string | | -| integer | boolean | | -| integer | byte | | -| integer | short | | -| integer | long | | -| integer | float | | -| integer | double | | -| integer | string | | -| long | boolean | | -| long | byte | | -| long | short | | -| long | integer | | -| long | float | | -| long | double | | -| long | string | | -| float | boolean | | -| float | byte | | -| float | short | | -| float | integer | | -| float | long | | -| float | double | | -| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| double | boolean | | -| double | byte | | -| double | short | | -| double | integer | | -| double | long | | -| double | float | | -| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| decimal | byte | | -| decimal | short | | -| decimal | integer | | -| decimal | long | | -| decimal | float | | -| decimal | double | | -| decimal | decimal | | -| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | -| string | boolean | | -| string | byte | | -| string | short | | -| string | integer | | -| string | long | | -| string | binary | | -| string | date | Only supports years between 262143 BC and 262142 AD | -| date | string | | -| timestamp | long | | -| timestamp | string | | -| timestamp | date | | +| From Type | To Type | Notes | +|-|-|-| +| boolean | byte | | +| boolean | short | | +| boolean | integer | | +| boolean | long | | +| boolean | float | | +| boolean | double | | +| boolean | string | | +| byte | boolean | | +| byte | short | | +| byte | integer | | +| byte | long | | +| byte | float | | +| byte | double | | +| byte | decimal | | +| byte | string | | +| short | boolean | | +| short | byte | | +| short | integer | | +| short | long | | +| short | float | | +| short | double | | +| short | decimal | | +| short | string | | +| integer | boolean | | +| integer | byte | | +| integer | short | | +| integer | long | | +| integer | float | | +| integer | double | | +| integer | string | | +| long | boolean | | +| long | byte | | +| long | short | | +| long | integer | | +| long | float | | +| long | double | | +| long | string | | +| float | boolean | | +| float | byte | | +| float | short | | +| float | integer | | +| float | long | | +| float | double | | +| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| double | boolean | | +| double | byte | | +| double | short | | +| double | integer | | +| double | long | | +| double | float | | +| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| decimal | byte | | +| decimal | short | | +| decimal | integer | | +| decimal | long | | +| decimal | float | | +| decimal | double | | +| decimal | decimal | | +| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | +| string | boolean | | +| string | byte | | +| string | short | | +| string | integer | | +| string | long | | +| string | binary | | +| string | date | Only supports years between 262143 BC and 262142 AD | +| date | string | | +| timestamp | long | | +| timestamp | string | | +| timestamp | date | | ### Incompatible Casts The following cast operations are not compatible with Spark for all inputs and are disabled by default. -| From Type | To Type | Notes | -| --------- | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | -| integer | decimal | No overflow check | -| long | decimal | No overflow check | -| float | decimal | There can be rounding differences | -| double | decimal | There can be rounding differences | -| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | -| string | timestamp | Not all valid formats are supported | -| binary | string | Only works for binary data representing valid UTF-8 strings | +| From Type | To Type | Notes | +|-|-|-| +| integer | decimal | No overflow check | +| long | decimal | No overflow check | +| float | decimal | There can be rounding differences | +| double | decimal | There can be rounding differences | +| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | +| string | timestamp | Not all valid formats are supported | +| binary | string | Only works for binary data representing valid UTF-8 strings | ### Unsupported Casts diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 093f3e5060..a45a3277bd 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -268,7 +268,7 @@ impl PhysicalPlanner { ExprStruct::Divide(expr) => { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; // TODO add support for other eval modes - assert!(eval_mode == EvalMode::Legacy); + assert!(eval_mode != EvalMode::Try); self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), @@ -295,7 +295,7 @@ impl PhysicalPlanner { ExprStruct::Remainder(expr) => { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; // TODO add support for other eval modes - assert!(eval_mode == EvalMode::Try); + // assert!(eval_mode == EvalMode::Try); let left = self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index 8c49354c9b..fcc57db128 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -22,11 +22,10 @@ package org.apache.comet.serde import scala.math.min import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, Divide, EqualTo, EvalMode, Expression, If, IntegralDivide, Literal, Multiply, Remainder, Subtract} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, TimestampType} - +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampNTZType, TimestampType} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.QueryPlanSerde.{castToProto, evalModeToProto, exprToProtoInternal, serializeDataType, supportedDataType} +import org.apache.comet.serde.QueryPlanSerde.{castToProto, evalModeToProto, exprToProtoInternal, serializeDataType} import org.apache.comet.shims.CometEvalModeUtil trait MathBase { @@ -85,6 +84,15 @@ trait MathBase { case _ => false } + def supportedDataType(dt: DataType): Boolean = dt match { + case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | + _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | + _: DecimalType | _: DateType | _: BooleanType | _: NullType => + true + case _ => + false + } + } object CometAdd extends CometExpressionSerde with MathBase { diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 0ba6ddd089..0b30575be1 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -314,8 +314,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("dictionary arithmetic") { // TODO: test ANSI mode withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { - val data = Seq((Integer.MAX_VALUE, 1)) - withParquetTable(data, "tbl") { + withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { checkSparkAnswerAndOperator("SELECT _1 + _2, _1 - _2, _1 * _2, _1 / _2, _1 % _2 FROM tbl") } } From 710277e23bede6855bc8b1b53fabea8ce452e1c6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 12:54:42 -0600 Subject: [PATCH 07/12] fix --- docs/source/user-guide/compatibility.md | 228 ++++++++++++------------ 1 file changed, 112 insertions(+), 116 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 5f0842eb4d..84c4aab0e9 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -38,15 +38,19 @@ Comet does not support reading decimals encoded in binary format. ### Parquet Scans Comet currently has three distinct implementations of the Parquet scan operator. The configuration property -`spark.comet.scan.impl` is used to select an implementation. - -| Implementation | Description | -| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `native_comet` | This is the default implementation. It provides strong compatibility with Spark but does not support complex types. | -| `native_datafusion` | This implementation delegates to DataFusion's `DataSourceExec`. | -| `native_iceberg_compat` | This implementation also delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | - -The new (and currently experimental) `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` +`spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, and +Comet will choose the most appropriate implementation based on the Parquet schema and other Comet configuration +settings. Most users should not need to change this setting. However, it is possible to force Comet to try and use +a particular implementation for all scan operations by setting this configuration property to one of the following +implementations. + +| Implementation | Description | +| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `native_comet` | This implementation provides strong compatibility with Spark but does not support complex types. This is the original scan implementation in Comet and may eventually be removed. | +| `native_iceberg_compat` | This implementation delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | +| `native_datafusion` | This experimental implementation delegates to DataFusion's `DataSourceExec` for full native execution. There are known compatibility issues when using this scan. | + +The `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` implementation: - Leverages the DataFusion community's ongoing improvements to `DataSourceExec` @@ -54,36 +58,28 @@ implementation: - Removes the use of reusable mutable-buffers in Comet, which is complex to maintain - Improves performance -The new scans currently have the following limitations: - -Issues common to both `native_datafusion` and `native_iceberg_compat`: +The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: - When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8` -or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these -logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned -rather than signed. By default, Comet will fall back to Spark when scanning Parquet files containing `byte` or `short` -types (regardless of the logical type). This behavior can be disabled by setting -`spark.comet.scan.allowIncompatible=true`. -- There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more -information. -- Reading maps containing complex types can result in errors or incorrect results [#1754] -- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] -- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]). + or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these + logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned + rather than signed. By default, Comet will fall back to `native_comet` when scanning Parquet files containing `byte` or `short` + types (regardless of the logical type). This behavior can be disabled by setting + `spark.comet.scan.allowIncompatible=true`. - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. -- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan. -Issues specific to `native_datafusion`: +The `native_datafusion` scan has some additional limitations: - Bucketed scans are not supported - No support for row indexes +- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] +- There are failures in the Spark SQL test suite [#1545] +- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with Spark [#1545]: https://github.com/apache/datafusion-comet/issues/1545 -[#1542]: https://github.com/apache/datafusion-comet/issues/1542 -[#1754]: https://github.com/apache/datafusion-comet/issues/1754 [#1758]: https://github.com/apache/datafusion-comet/issues/1758 -[Comet Tuning Guide]: tuning.md -## ANSI mode +## ANSI Mode Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default, Comet will fall back to Spark if ANSI mode is enabled. To enable Comet to accelerate queries when ANSI mode is enabled, @@ -92,7 +88,7 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. -## Floating number comparison +## Floating-point Number Comparison Spark normalizes NaN and zero for floating point numbers for several cases. See `NormalizeFloatingNumbers` optimization rule in Spark. However, one exception is comparison. Spark does not normalize NaN and zero when comparing values @@ -125,104 +121,104 @@ Cast operations in Comet fall into three levels of support: - **Compatible**: The results match Apache Spark - **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs -will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting -`spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not -recommended for production use. + will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting + `spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not + recommended for production use. - **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to -Spark. + Spark. ### Compatible Casts The following cast operations are generally compatible with Spark except for the differences noted here. -| From Type | To Type | Notes | -|-|-|-| -| boolean | byte | | -| boolean | short | | -| boolean | integer | | -| boolean | long | | -| boolean | float | | -| boolean | double | | -| boolean | string | | -| byte | boolean | | -| byte | short | | -| byte | integer | | -| byte | long | | -| byte | float | | -| byte | double | | -| byte | decimal | | -| byte | string | | -| short | boolean | | -| short | byte | | -| short | integer | | -| short | long | | -| short | float | | -| short | double | | -| short | decimal | | -| short | string | | -| integer | boolean | | -| integer | byte | | -| integer | short | | -| integer | long | | -| integer | float | | -| integer | double | | -| integer | string | | -| long | boolean | | -| long | byte | | -| long | short | | -| long | integer | | -| long | float | | -| long | double | | -| long | string | | -| float | boolean | | -| float | byte | | -| float | short | | -| float | integer | | -| float | long | | -| float | double | | -| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| double | boolean | | -| double | byte | | -| double | short | | -| double | integer | | -| double | long | | -| double | float | | -| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| decimal | byte | | -| decimal | short | | -| decimal | integer | | -| decimal | long | | -| decimal | float | | -| decimal | double | | -| decimal | decimal | | -| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | -| string | boolean | | -| string | byte | | -| string | short | | -| string | integer | | -| string | long | | -| string | binary | | -| string | date | Only supports years between 262143 BC and 262142 AD | -| date | string | | -| timestamp | long | | -| timestamp | string | | -| timestamp | date | | +| From Type | To Type | Notes | +| --------- | ------- | --------------------------------------------------------------------------------------------------------------- | +| boolean | byte | | +| boolean | short | | +| boolean | integer | | +| boolean | long | | +| boolean | float | | +| boolean | double | | +| boolean | string | | +| byte | boolean | | +| byte | short | | +| byte | integer | | +| byte | long | | +| byte | float | | +| byte | double | | +| byte | decimal | | +| byte | string | | +| short | boolean | | +| short | byte | | +| short | integer | | +| short | long | | +| short | float | | +| short | double | | +| short | decimal | | +| short | string | | +| integer | boolean | | +| integer | byte | | +| integer | short | | +| integer | long | | +| integer | float | | +| integer | double | | +| integer | string | | +| long | boolean | | +| long | byte | | +| long | short | | +| long | integer | | +| long | float | | +| long | double | | +| long | string | | +| float | boolean | | +| float | byte | | +| float | short | | +| float | integer | | +| float | long | | +| float | double | | +| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| double | boolean | | +| double | byte | | +| double | short | | +| double | integer | | +| double | long | | +| double | float | | +| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| decimal | byte | | +| decimal | short | | +| decimal | integer | | +| decimal | long | | +| decimal | float | | +| decimal | double | | +| decimal | decimal | | +| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | +| string | boolean | | +| string | byte | | +| string | short | | +| string | integer | | +| string | long | | +| string | binary | | +| string | date | Only supports years between 262143 BC and 262142 AD | +| date | string | | +| timestamp | long | | +| timestamp | string | | +| timestamp | date | | ### Incompatible Casts The following cast operations are not compatible with Spark for all inputs and are disabled by default. -| From Type | To Type | Notes | -|-|-|-| -| integer | decimal | No overflow check | -| long | decimal | No overflow check | -| float | decimal | There can be rounding differences | -| double | decimal | There can be rounding differences | -| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | -| string | timestamp | Not all valid formats are supported | -| binary | string | Only works for binary data representing valid UTF-8 strings | +| From Type | To Type | Notes | +| --------- | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | +| integer | decimal | No overflow check | +| long | decimal | No overflow check | +| float | decimal | There can be rounding differences | +| double | decimal | There can be rounding differences | +| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | +| string | timestamp | Not all valid formats are supported | +| binary | string | Only works for binary data representing valid UTF-8 strings | ### Unsupported Casts From 66a8bb26ffab1d1e1c55a2c9b3e6869bde120cb1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 13:04:56 -0600 Subject: [PATCH 08/12] scalastyle --- spark/src/main/scala/org/apache/comet/serde/arithmetic.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index fcc57db128..7faa92a52e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -23,6 +23,7 @@ import scala.math.min import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, Divide, EqualTo, EvalMode, Expression, If, IntegralDivide, Literal, Multiply, Remainder, Subtract} import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampNTZType, TimestampType} + import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode import org.apache.comet.serde.QueryPlanSerde.{castToProto, evalModeToProto, exprToProtoInternal, serializeDataType} From 0b3b4769f400879391023565f6c26f7ea7530f49 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 13:20:58 -0600 Subject: [PATCH 09/12] format --- spark/src/main/scala/org/apache/comet/serde/arithmetic.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index 7faa92a52e..fc7bc62ddc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -87,8 +87,8 @@ trait MathBase { def supportedDataType(dt: DataType): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | - _: DecimalType | _: DateType | _: BooleanType | _: NullType => + _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | + _: DecimalType | _: DateType | _: BooleanType | _: NullType => true case _ => false From 2070a1b3f9e0095be9f2115ec05ee116918c2268 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 14:25:15 -0600 Subject: [PATCH 10/12] only fall back to Spark for TRY --- native/core/src/execution/planner.rs | 34 +++++++++---------- .../org/apache/comet/serde/arithmetic.scala | 12 +++---- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index a45a3277bd..ba662ed72d 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -230,9 +230,9 @@ impl PhysicalPlanner { ) -> Result, ExecutionError> { match spark_expr.expr_struct.as_ref().unwrap() { ExprStruct::Add(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - // TODO add support for other eval modes - assert!(eval_mode == EvalMode::Legacy); + // TODO respect eval mode + // https://github.com/apache/datafusion-comet/issues/2021 + let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), @@ -242,9 +242,9 @@ impl PhysicalPlanner { ) } ExprStruct::Subtract(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - // TODO add support for other eval modes - assert!(eval_mode == EvalMode::Legacy); + // TODO respect eval mode + // https://github.com/apache/datafusion-comet/issues/2021 + let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), @@ -254,9 +254,9 @@ impl PhysicalPlanner { ) } ExprStruct::Multiply(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - // TODO add support for other eval modes - assert!(eval_mode == EvalMode::Legacy); + // TODO respect eval mode + // https://github.com/apache/datafusion-comet/issues/2021 + let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), @@ -266,9 +266,9 @@ impl PhysicalPlanner { ) } ExprStruct::Divide(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - // TODO add support for other eval modes - assert!(eval_mode != EvalMode::Try); + // TODO respect eval mode + // https://github.com/apache/datafusion-comet/issues/2021 + let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), @@ -278,9 +278,9 @@ impl PhysicalPlanner { ) } ExprStruct::IntegralDivide(expr) => { - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - // TODO add support for other eval modes - assert!(eval_mode == EvalMode::Legacy); + // TODO respect eval mode + // https://github.com/apache/datafusion-comet/issues/2021 + let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr_with_options( expr.left.as_ref().unwrap(), expr.right.as_ref().unwrap(), @@ -294,8 +294,8 @@ impl PhysicalPlanner { } ExprStruct::Remainder(expr) => { let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - // TODO add support for other eval modes - // assert!(eval_mode == EvalMode::Try); + // TODO add support for EvalMode::TRY + // https://github.com/apache/datafusion-comet/issues/2021 let left = self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; let right = diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index fc7bc62ddc..d4889a281d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -106,7 +106,7 @@ object CometAdd extends CometExpressionSerde with MathBase { withInfo(add, s"Unsupported datatype ${add.left.dataType}") return None } - if (add.evalMode != EvalMode.LEGACY) { + if (add.evalMode == EvalMode.TRY) { withInfo(add, s"Eval mode ${add.evalMode} is not supported") return None } @@ -132,7 +132,7 @@ object CometSubtract extends CometExpressionSerde with MathBase { withInfo(sub, s"Unsupported datatype ${sub.left.dataType}") return None } - if (sub.evalMode != EvalMode.LEGACY) { + if (sub.evalMode == EvalMode.TRY) { withInfo(sub, s"Eval mode ${sub.evalMode} is not supported") return None } @@ -158,7 +158,7 @@ object CometMultiply extends CometExpressionSerde with MathBase { withInfo(mul, s"Unsupported datatype ${mul.left.dataType}") return None } - if (mul.evalMode != EvalMode.LEGACY) { + if (mul.evalMode == EvalMode.TRY) { withInfo(mul, s"Eval mode ${mul.evalMode} is not supported") return None } @@ -190,7 +190,7 @@ object CometDivide extends CometExpressionSerde with MathBase { withInfo(div, s"Unsupported datatype ${div.left.dataType}") return None } - if (div.evalMode != EvalMode.LEGACY) { + if (div.evalMode == EvalMode.TRY) { withInfo(div, s"Eval mode ${div.evalMode} is not supported") return None } @@ -218,7 +218,7 @@ object CometIntegralDivide extends CometExpressionSerde with MathBase { withInfo(div, s"Unsupported datatype ${div.left.dataType}") return None } - if (div.evalMode != EvalMode.LEGACY) { + if (div.evalMode == EvalMode.TRY) { withInfo(div, s"Eval mode ${div.evalMode} is not supported") return None } @@ -275,7 +275,7 @@ object CometRemainder extends CometExpressionSerde with MathBase { withInfo(remainder, s"Unsupported datatype ${remainder.left.dataType}") return None } - if (remainder.evalMode != EvalMode.LEGACY) { + if (remainder.evalMode == EvalMode.TRY) { withInfo(remainder, s"Eval mode ${remainder.evalMode} is not supported") return None } From e86b51af972c737e99e4dd3904f1aba8f7c2016e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 11 Jul 2025 14:27:35 -0600 Subject: [PATCH 11/12] more links --- native/core/src/execution/planner.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ba662ed72d..c2091346d7 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -232,6 +232,7 @@ impl PhysicalPlanner { ExprStruct::Add(expr) => { // TODO respect eval mode // https://github.com/apache/datafusion-comet/issues/2021 + // https://github.com/apache/datafusion-comet/issues/536 let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -244,6 +245,7 @@ impl PhysicalPlanner { ExprStruct::Subtract(expr) => { // TODO respect eval mode // https://github.com/apache/datafusion-comet/issues/2021 + // https://github.com/apache/datafusion-comet/issues/535 let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -256,6 +258,7 @@ impl PhysicalPlanner { ExprStruct::Multiply(expr) => { // TODO respect eval mode // https://github.com/apache/datafusion-comet/issues/2021 + // https://github.com/apache/datafusion-comet/issues/534 let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -268,6 +271,7 @@ impl PhysicalPlanner { ExprStruct::Divide(expr) => { // TODO respect eval mode // https://github.com/apache/datafusion-comet/issues/2021 + // https://github.com/apache/datafusion-comet/issues/533 let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -280,6 +284,7 @@ impl PhysicalPlanner { ExprStruct::IntegralDivide(expr) => { // TODO respect eval mode // https://github.com/apache/datafusion-comet/issues/2021 + // https://github.com/apache/datafusion-comet/issues/533 let _eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr_with_options( expr.left.as_ref().unwrap(), From 0a23579717f2be03faeae3a0ee7091e5a5258922 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 19 Jul 2025 09:11:36 +0100 Subject: [PATCH 12/12] update supported types --- .../org/apache/comet/serde/arithmetic.scala | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index d4889a281d..3a7a9f8fb5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -22,7 +22,7 @@ package org.apache.comet.serde import scala.math.min import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, Divide, EqualTo, EvalMode, Expression, If, IntegralDivide, Literal, Multiply, Remainder, Subtract} -import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampNTZType, TimestampType} +import org.apache.spark.sql.types.{ByteType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode @@ -66,29 +66,18 @@ trait MathBase { } } - def nullIfWhenPrimitive(expression: Expression): Expression = - if (isPrimitive(expression)) { - val zero = Literal.default(expression.dataType) - expression match { - case _: Literal if expression != zero => expression - case _ => - If(EqualTo(expression, zero), Literal.create(null, expression.dataType), expression) - } - } else { - expression + def nullIfWhenPrimitive(expression: Expression): Expression = { + val zero = Literal.default(expression.dataType) + expression match { + case _: Literal if expression != zero => expression + case _ => + If(EqualTo(expression, zero), Literal.create(null, expression.dataType), expression) } - - private def isPrimitive(expression: Expression): Boolean = expression.dataType match { - case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: TimestampType | _: DateType | _: BooleanType | _: DecimalType => - true - case _ => false } def supportedDataType(dt: DataType): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | - _: DecimalType | _: DateType | _: BooleanType | _: NullType => + _: DoubleType | _: DecimalType => true case _ => false