Skip to content

Commit 87ef44c

Browse files
authored
minor: Replace many instances of checkSparkAnswer with checkSparkAnswerAndOperator (#1851)
1 parent 2ba42af commit 87ef44c

File tree

4 files changed

+48
-49
lines changed

4 files changed

+48
-49
lines changed

spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path
2626
import org.apache.spark.sql.CometTestBase
2727
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2828
import org.apache.spark.sql.functions.{array, col, expr, lit, udf}
29-
import org.apache.spark.sql.types.StructType
3029

3130
import org.apache.comet.CometSparkSessionExtensions.isSpark35Plus
3231
import org.apache.comet.serde.CometArrayExcept
@@ -399,12 +398,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
399398
sql(s"SELECT array($fieldName, $fieldName) as a, array($fieldName) as b FROM t1")
400399
.createOrReplaceTempView("t2")
401400
val df = sql("SELECT array_except(a, b) FROM t2")
402-
field.dataType match {
403-
case _: StructType =>
404-
// skip due to https://github.com/apache/datafusion-comet/issues/1314
405-
case _ =>
406-
checkSparkAnswer(df)
407-
}
401+
checkSparkAnswer(df)
408402
}
409403
}
410404
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
221221
val qry = "select _9 from tbl order by _11"
222222
if (usingDataSourceExec(conf)) {
223223
if (!allowIncompatible) {
224-
checkSparkAnswer(qry)
224+
checkSparkAnswerAndOperator(qry)
225225
} else {
226226
// need to convert the values to unsigned values
227227
val expected = (Byte.MinValue to Byte.MaxValue)

spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
7070
val data: Seq[(Float, Int)] = Seq((Float.PositiveInfinity, 1))
7171
withParquetTable(data, "tbl", false) {
7272
val df = sql("SELECT stddev_pop(_1), stddev_pop(_2) FROM tbl")
73-
checkSparkAnswer(df)
73+
checkSparkAnswerAndOperator(df)
7474
}
7575
}
7676
}
@@ -199,7 +199,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
199199
makeParquetFile(path, 10000, 10, false)
200200
withParquetTable(path.toUri.toString, "tbl") {
201201
val df = sql("SELECT _g5 FROM tbl GROUP BY _g1, _g2, _g3, _g4, _g5")
202-
checkSparkAnswer(df)
202+
checkSparkAnswerAndOperator(df)
203203
}
204204
}
205205
}
@@ -216,7 +216,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
216216
makeParquetFile(path, 10000, 10, dictionaryEnabled)
217217
withParquetTable(path.toUri.toString, "tbl") {
218218
val df = sql("SELECT * FROM tbl").groupBy("_g1").agg(sum($"_3" + $"_g3"))
219-
checkSparkAnswer(df)
219+
checkSparkAnswerAndOperator(df)
220220
}
221221
}
222222
}
@@ -271,7 +271,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
271271
val path = new Path(dir.toURI.toString, "test")
272272
makeParquetFile(path, 10000, 10, dictionaryEnabled)
273273
withParquetTable(path.toUri.toString, "tbl") {
274-
checkSparkAnswer(sql("SELECT * FROM tbl").sort("_g1").groupBy("_g1").agg(sum("_8")))
274+
checkSparkAnswerAndOperator(
275+
sql("SELECT * FROM tbl").sort("_g1").groupBy("_g1").agg(sum("_8")))
275276
}
276277
}
277278
}
@@ -323,7 +324,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
323324
val df3 = sql("SELECT COUNT(_1), COUNT(_2) FROM tbl")
324325
checkAnswer(df3, Row(5, 5) :: Nil)
325326

326-
checkSparkAnswer("SELECT _2, MIN(_1), MAX(_1) FROM tbl GROUP BY _2")
327+
checkSparkAnswerAndOperator("SELECT _2, MIN(_1), MAX(_1) FROM tbl GROUP BY _2")
327328
}
328329
}
329330
}
@@ -335,8 +336,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
335336
"tbl",
336337
dictionaryEnabled) {
337338

338-
checkSparkAnswer("SELECT _2, AVG(_1) FROM tbl GROUP BY _2")
339-
checkSparkAnswer("SELECT AVG(_2) FROM tbl")
339+
checkSparkAnswerAndOperator("SELECT _2, AVG(_1) FROM tbl GROUP BY _2")
340+
checkSparkAnswerAndOperator("SELECT AVG(_2) FROM tbl")
340341
}
341342
}
342343
}
@@ -348,10 +349,10 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
348349
withTable(table) {
349350
sql(s"create table $table(col1 int, col2 int) using parquet")
350351
sql(s"insert into $table values(1, 1), (2, 1), (3, 2), (null, 2), (null, 1)")
351-
checkSparkAnswer(s"SELECT COUNT(col1) FROM $table")
352-
checkSparkAnswer(s"SELECT col2, COUNT(col1) FROM $table GROUP BY col2")
353-
checkSparkAnswer(s"SELECT avg(col1) FROM $table")
354-
checkSparkAnswer(s"SELECT col2, avg(col1) FROM $table GROUP BY col2")
352+
checkSparkAnswerAndOperator(s"SELECT COUNT(col1) FROM $table")
353+
checkSparkAnswerAndOperator(s"SELECT col2, COUNT(col1) FROM $table GROUP BY col2")
354+
checkSparkAnswerAndOperator(s"SELECT avg(col1) FROM $table")
355+
checkSparkAnswerAndOperator(s"SELECT col2, avg(col1) FROM $table GROUP BY col2")
355356
}
356357
}
357358
}
@@ -360,8 +361,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
360361
test("SUM/AVG non-decimal overflow") {
361362
Seq(true, false).foreach { dictionaryEnabled =>
362363
withParquetTable(Seq((0, 100.toLong), (0, Long.MaxValue)), "tbl", dictionaryEnabled) {
363-
checkSparkAnswer("SELECT SUM(_2) FROM tbl GROUP BY _1")
364-
checkSparkAnswer("SELECT AVG(_2) FROM tbl GROUP BY _1")
364+
checkSparkAnswerAndOperator("SELECT SUM(_2) FROM tbl GROUP BY _1")
365+
checkSparkAnswerAndOperator("SELECT AVG(_2) FROM tbl GROUP BY _1")
365366
}
366367
}
367368
}
@@ -373,8 +374,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
373374
checkAnswer(df1, Row(0, 6) :: Row(1, 4) :: Nil)
374375
val df2 = sql("SELECT _2, COUNT(_1) FROM tbl GROUP BY _2")
375376
checkAnswer(df2, Row(0, 3) :: Row(1, 2) :: Nil)
376-
checkSparkAnswer("SELECT _2, MIN(_1), MAX(_1) FROM tbl GROUP BY _2")
377-
checkSparkAnswer("SELECT _2, AVG(_1) FROM tbl GROUP BY _2")
377+
checkSparkAnswerAndOperator("SELECT _2, MIN(_1), MAX(_1) FROM tbl GROUP BY _2")
378+
checkSparkAnswerAndOperator("SELECT _2, AVG(_1) FROM tbl GROUP BY _2")
378379
}
379380
}
380381
}
@@ -417,7 +418,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
417418

418419
val df3 = sql("SELECT _2, MIN(_1), MAX(_1) FROM tbl GROUP BY _2")
419420
checkAnswer(df3, Row(null.asInstanceOf[Int], 0, 9) :: Row(1, 1, 7) :: Row(2, 2, 8) :: Nil)
420-
checkSparkAnswer(sql("SELECT _2, AVG(_1) FROM tbl GROUP BY _2"))
421+
checkSparkAnswerAndOperator(sql("SELECT _2, AVG(_1) FROM tbl GROUP BY _2"))
421422
}
422423
}
423424
}
@@ -441,7 +442,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
441442
val df3 = sql("SELECT _2, MIN(_1), MAX(_1) FROM tbl GROUP BY _2")
442443
checkAnswer(df3, Row(null.asInstanceOf[Int], 0, 9) :: Row(1, 0, 7) :: Row(2, 0, 5) :: Nil)
443444

444-
checkSparkAnswer(sql("SELECT _2, AVG(_1) FROM tbl GROUP BY _2"))
445+
checkSparkAnswerAndOperator(sql("SELECT _2, AVG(_1) FROM tbl GROUP BY _2"))
445446
}
446447
}
447448
}
@@ -469,7 +470,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
469470
Row(null.asInstanceOf[Int], null.asInstanceOf[Int], null.asInstanceOf[Int]),
470471
Row(1, null.asInstanceOf[Int], null.asInstanceOf[Int]),
471472
Row(2, null.asInstanceOf[Int], null.asInstanceOf[Int])))
472-
checkSparkAnswer(sql("SELECT _2, SUM(_1) FROM tbl GROUP BY _2"))
473+
checkSparkAnswerAndOperator(sql("SELECT _2, SUM(_1) FROM tbl GROUP BY _2"))
473474
}
474475
}
475476
}
@@ -480,9 +481,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
480481
val path = new Path(dir.toURI.toString, "test")
481482
makeParquetFile(path, 1000, 10, dictionaryEnabled)
482483
withParquetTable(path.toUri.toString, "tbl") {
483-
checkSparkAnswer(
484+
checkSparkAnswerAndOperator(
484485
"SELECT _g5, SUM(_5), COUNT(_5), MIN(_5), MAX(_5), AVG(_5) FROM tbl GROUP BY _g5")
485-
checkSparkAnswer(
486+
checkSparkAnswerAndOperator(
486487
"SELECT _g6, SUM(_6), COUNT(_6), MIN(_6), MAX(_6), AVG(_6) FROM tbl GROUP BY _g6")
487488
}
488489
}
@@ -548,13 +549,13 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
548549
makeParquetFile(path, 1000, 10, dictionaryEnabled)
549550
withParquetTable(path.toUri.toString, "tbl") {
550551
Seq("SUM", "AVG").foreach { FN =>
551-
checkSparkAnswer(
552+
checkSparkAnswerAndOperator(
552553
s"SELECT _g1, $FN(_8 + CAST(1 AS DECIMAL(20, 10))) FROM tbl GROUP BY _g1")
553-
checkSparkAnswer(
554+
checkSparkAnswerAndOperator(
554555
s"SELECT _g1, $FN(_8 - CAST(-1 AS DECIMAL(10, 3))) FROM tbl GROUP BY _g1")
555-
checkSparkAnswer(
556+
checkSparkAnswerAndOperator(
556557
s"SELECT _g1, $FN(_9 * CAST(3.14 AS DECIMAL(4, 3))) FROM tbl GROUP BY _g1")
557-
checkSparkAnswer(
558+
checkSparkAnswerAndOperator(
558559
s"SELECT _g1, $FN(_9 / CAST(1.2345 AS DECIMAL(35, 10))) FROM tbl GROUP BY _g1")
559560
}
560561
}
@@ -605,9 +606,12 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
605606
val path = new Path(dir.toURI.toString, "test")
606607
makeParquetFile(path, 1000, 10, dictionaryEnabled)
607608
withParquetTable(path.toUri.toString, "tbl") {
608-
checkSparkAnswer("SELECT _g1, COUNT(_10), MIN(_10), MAX(_10) FROM tbl GROUP BY _g1")
609-
checkSparkAnswer("SELECT _g1, COUNT(_11), MIN(_11), MAX(_11) FROM tbl GROUP BY _g1")
610-
checkSparkAnswer("SELECT _g1, COUNT(_12), MIN(_12), MAX(_12) FROM tbl GROUP BY _g1")
609+
checkSparkAnswerAndOperator(
610+
"SELECT _g1, COUNT(_10), MIN(_10), MAX(_10) FROM tbl GROUP BY _g1")
611+
checkSparkAnswerAndOperator(
612+
"SELECT _g1, COUNT(_11), MIN(_11), MAX(_11) FROM tbl GROUP BY _g1")
613+
checkSparkAnswerAndOperator(
614+
"SELECT _g1, COUNT(_12), MIN(_12), MAX(_12) FROM tbl GROUP BY _g1")
611615
}
612616
}
613617
}
@@ -629,7 +633,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
629633
dictionaryEnabled) {
630634
withView("v") {
631635
sql("CREATE TEMP VIEW v AS SELECT _1, _2 FROM tbl ORDER BY _1")
632-
checkSparkAnswer(
636+
checkSparkAnswerAndOperator(
633637
"SELECT _2, SUM(_1), SUM(DISTINCT _1), MIN(_1), MAX(_1), COUNT(_1)," +
634638
" COUNT(DISTINCT _1), AVG(_1), FIRST(_1), LAST(_1) FROM v GROUP BY _2")
635639
}
@@ -656,13 +660,13 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
656660
withParquetTable(path.toUri.toString, "tbl") {
657661
withView("v") {
658662
sql("CREATE TEMP VIEW v AS SELECT _g1, _g2, _3 FROM tbl ORDER BY _3")
659-
checkSparkAnswer(
663+
checkSparkAnswerAndOperator(
660664
"SELECT _g1, _g2, FIRST(_3) FROM v GROUP BY _g1, _g2 ORDER BY _g1, _g2")
661-
checkSparkAnswer(
665+
checkSparkAnswerAndOperator(
662666
"SELECT _g1, _g2, LAST(_3) FROM v GROUP BY _g1, _g2 ORDER BY _g1, _g2")
663-
checkSparkAnswer(
667+
checkSparkAnswerAndOperator(
664668
"SELECT _g1, _g2, FIRST(_3) IGNORE NULLS FROM v GROUP BY _g1, _g2 ORDER BY _g1, _g2")
665-
checkSparkAnswer(
669+
checkSparkAnswerAndOperator(
666670
"SELECT _g1, _g2, LAST(_3) IGNORE NULLS FROM v GROUP BY _g1, _g2 ORDER BY _g1, _g2")
667671
}
668672
}
@@ -732,9 +736,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
732736
withParquetTable(path.toUri.toString, "tbl") {
733737
withView("v") {
734738
sql("CREATE TEMP VIEW v AS SELECT _g3, _g4, _3, _4 FROM tbl ORDER BY _3, _4")
735-
checkSparkAnswer(
739+
checkSparkAnswerAndOperator(
736740
"SELECT _g3, _g4, FIRST(_3), FIRST(_4) FROM v GROUP BY _g3, _g4 ORDER BY _g3, _g4")
737-
checkSparkAnswer(
741+
checkSparkAnswerAndOperator(
738742
"SELECT _g3, _g4, LAST(_3), LAST(_4) FROM v GROUP BY _g3, _g4 ORDER BY _g3, _g4")
739743
}
740744
}
@@ -799,8 +803,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
799803
withView("v") {
800804
sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _1, _2, _3, _4 " +
801805
"FROM tbl ORDER BY _1, _2, _3, _4")
802-
checkSparkAnswer(s"SELECT _g$gCol, FIRST(_1), FIRST(_2), FIRST(_3), " +
803-
s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol ORDER BY _g$gCol")
806+
checkSparkAnswerAndOperator(
807+
s"SELECT _g$gCol, FIRST(_1), FIRST(_2), FIRST(_3), " +
808+
s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol ORDER BY _g$gCol")
804809
}
805810
}
806811
}
@@ -1021,7 +1026,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
10211026
sql(s"insert into $table values(2, null)")
10221027

10231028
val query = sql(s"select a, AVG(b) from $table GROUP BY a")
1024-
checkSparkAnswer(query)
1029+
checkSparkAnswerAndOperator(query)
10251030
}
10261031
}
10271032
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class CometExecSuite extends CometTestBase {
8181
.create()
8282

8383
val df = sql("SELECT * FROM test_data ORDER BY c1 LIMIT 3")
84-
checkSparkAnswer(df)
84+
checkSparkAnswerAndOperator(df)
8585
}
8686
}
8787
}
@@ -255,7 +255,7 @@ class CometExecSuite extends CometTestBase {
255255
|)
256256
|SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
257257
|""".stripMargin)
258-
checkSparkAnswer(df)
258+
checkSparkAnswerAndOperator(df)
259259
}
260260
}
261261

@@ -766,7 +766,7 @@ class CometExecSuite extends CometTestBase {
766766
|""".stripMargin
767767

768768
val df = sql(query)
769-
checkSparkAnswer(df)
769+
checkSparkAnswerAndOperator(df)
770770
val exchanges = stripAQEPlan(df.queryExecution.executedPlan).collect {
771771
case s: CometShuffleExchangeExec if s.shuffleType == CometColumnarShuffle =>
772772
s
@@ -826,7 +826,7 @@ class CometExecSuite extends CometTestBase {
826826
| GROUP BY a._1) t
827827
|JOIN tbl_c c ON t.a1 = c._1
828828
|""".stripMargin)
829-
checkSparkAnswer(df)
829+
checkSparkAnswerAndOperator(df)
830830

831831
// Before AQE: one CometBroadcastExchange, no CometColumnarToRow
832832
var columnarToRowExec = stripAQEPlan(df.queryExecution.executedPlan).collect {

0 commit comments

Comments
 (0)