From b640cbe739af0f31a5ffdb4e9390c389c6a6b88c Mon Sep 17 00:00:00 2001 From: Ruchir28 Date: Fri, 27 Jun 2025 14:10:58 +0530 Subject: [PATCH] chore: Remove obsolete supportedSortType function after Arrow updates --- .../apache/comet/serde/QueryPlanSerde.scala | 39 ------------------- .../CometTakeOrderedAndProjectExec.scala | 4 +- .../apache/comet/exec/CometExecSuite.scala | 6 +-- 3 files changed, 4 insertions(+), 45 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1e3cec2852..8fbea247d5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2335,10 +2335,6 @@ object QueryPlanSerde extends Logging with CometExprShim { } case SortExec(sortOrder, _, child, _) if CometConf.COMET_EXEC_SORT_ENABLED.get(conf) => - if (!supportedSortType(op, sortOrder)) { - return None - } - val sortOrders = sortOrder.map(exprToProto(_, child.output)) if (sortOrders.forall(_.isDefined) && childOp.nonEmpty) { @@ -2939,41 +2935,6 @@ object QueryPlanSerde extends Logging with CometExprShim { } - // TODO: Remove this constraint when we upgrade to new arrow-rs including - // https://github.com/apache/arrow-rs/pull/6225 - def supportedSortType(op: SparkPlan, sortOrder: Seq[SortOrder]): Boolean = { - def canRank(dt: DataType): Boolean = { - dt match { - case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: TimestampType | _: DecimalType | _: DateType => - true - case _: BinaryType | _: StringType => true - case _ => false - } - } - - if (sortOrder.length == 1) { - val canSort = sortOrder.head.dataType match { - case _: BooleanType => true - case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType | - _: DateType => - true - case _: BinaryType | _: StringType => true - case ArrayType(elementType, _) => canRank(elementType) - case _ => false - } - if (!canSort) { - withInfo(op, s"Sort on single column of type ${sortOrder.head.dataType} is not supported") - false - } else { - true - } - } else { - true - } - } - private def validatePartitionAndSortSpecsForWindowFunc( partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 8852538687..6748629eb6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleR import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.serde.QueryPlanSerde.exprToProto -import org.apache.comet.serde.QueryPlanSerde.supportedSortType /** * Comet physical plan node for Spark `TakeOrderedAndProjectExec`. @@ -134,7 +133,6 @@ object CometTakeOrderedAndProjectExec { def isSupported(plan: TakeOrderedAndProjectExec): Boolean = { val exprs = plan.projectList.map(exprToProto(_, plan.child.output)) val sortOrders = plan.sortOrder.map(exprToProto(_, plan.child.output)) - exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) && plan.offset == 0 && - supportedSortType(plan, plan.sortOrder) + exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) && plan.offset == 0 } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 5c458c27bb..281dee4ca9 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -167,7 +167,7 @@ class CometExecSuite extends CometTestBase { } } - test("Sort on single struct should fallback to Spark") { + test("Sort on single struct should work in Comet") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", @@ -180,7 +180,7 @@ class CometExecSuite extends CometTestBase { withParquetFile(data1) { file => readParquetFile(file) { df => val sort = df.sort("_1") - checkSparkAnswer(sort) + checkSparkAnswerAndOperator(sort) } } @@ -195,7 +195,7 @@ class CometExecSuite extends CometTestBase { withParquetFile(data2) { file => readParquetFile(file) { df => val sort = df.sort("_1") - checkSparkAnswer(sort) + checkSparkAnswerAndOperator(sort) } } }