Skip to content

Commit 2ba42af

Browse files
authored
fix: Fall back to Spark for RANGE BETWEEN window expressions (#1848)
1 parent 6463153 commit 2ba42af

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
261261
.newBuilder()
262262
.setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build())
263263
.build()
264-
case e =>
264+
case e if frameType == RowFrame =>
265265
val offset = e.eval() match {
266266
case i: Integer => i.toLong
267267
case l: Long => l
@@ -275,6 +275,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
275275
.setOffset(offset)
276276
.build())
277277
.build()
278+
case _ =>
279+
// TODO add support for numeric and temporal RANGE BETWEEN expressions
280+
// see https://github.com/apache/datafusion-comet/issues/1246
281+
return None
278282
}
279283

280284
val uBoundProto = uBound match {
@@ -288,13 +292,12 @@ object QueryPlanSerde extends Logging with CometExprShim {
288292
.newBuilder()
289293
.setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build())
290294
.build()
291-
case e =>
295+
case e if frameType == RowFrame =>
292296
val offset = e.eval() match {
293297
case i: Integer => i.toLong
294298
case l: Long => l
295299
case _ => return None
296300
}
297-
298301
OperatorOuterClass.UpperWindowFrameBound
299302
.newBuilder()
300303
.setFollowing(
@@ -303,6 +306,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
303306
.setOffset(offset)
304307
.build())
305308
.build()
309+
case _ =>
310+
// TODO add support for numeric and temporal RANGE BETWEEN expressions
311+
// see https://github.com/apache/datafusion-comet/issues/1246
312+
return None
306313
}
307314

308315
(frameProto, lBoundProto, uBoundProto)

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@ package org.apache.comet
2121

2222
import java.time.{Duration, Period}
2323

24+
import scala.collection.immutable.Seq
2425
import scala.reflect.ClassTag
2526
import scala.reflect.runtime.universe.TypeTag
2627
import scala.util.Random
2728

2829
import org.apache.hadoop.fs.Path
2930
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
3031
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
31-
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec}
32+
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec}
3233
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, WholeStageCodegenExec}
3334
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
35+
import org.apache.spark.sql.expressions.Window
3436
import org.apache.spark.sql.functions._
3537
import org.apache.spark.sql.internal.SQLConf
3638
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
@@ -2705,4 +2707,25 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
27052707
}
27062708
}
27072709

2710+
test("window query with rangeBetween") {
2711+
2712+
// values are int
2713+
val df = Seq(1, 2, 4, 3, 2, 1).toDF("value")
2714+
val window = Window.orderBy($"value".desc)
2715+
2716+
// ranges are long
2717+
val df2 = df.select(
2718+
$"value",
2719+
sum($"value").over(window.rangeBetween(Window.unboundedPreceding, 1L)),
2720+
sum($"value").over(window.rangeBetween(1L, Window.unboundedFollowing)))
2721+
2722+
// Comet does not support RANGE BETWEEN
2723+
// https://github.com/apache/datafusion-comet/issues/1246
2724+
val (_, cometPlan) = checkSparkAnswer(df2)
2725+
val cometWindowExecs = collect(cometPlan) { case w: CometWindowExec =>
2726+
w
2727+
}
2728+
assert(cometWindowExecs.isEmpty)
2729+
}
2730+
27082731
}

0 commit comments

Comments
 (0)