-
Notifications
You must be signed in to change notification settings - Fork 225
Chore: Improve array contains test coverage #2030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ import scala.util.Random | |
import org.apache.hadoop.fs.Path | ||
import org.apache.spark.sql.CometTestBase | ||
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper | ||
import org.apache.spark.sql.functions.{array, col, expr, lit, udf} | ||
import org.apache.spark.sql.functions._ | ||
|
||
import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} | ||
import org.apache.comet.serde.CometArrayExcept | ||
|
@@ -218,16 +218,111 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp | |
} | ||
} | ||
|
||
test("array_contains") { | ||
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { | ||
withTempDir { dir => | ||
val path = new Path(dir.toURI.toString, "test.parquet") | ||
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n = 10000) | ||
spark.read.parquet(path.toString).createOrReplaceTempView("t1"); | ||
test("array_contains - int values") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would comment that INTs are in separate tests as ints require incompatible flag There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, I forgot to disable this setting. |
||
withTempDir { dir => | ||
val path = new Path(dir.toURI.toString, "test.parquet") | ||
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, n = 10000) | ||
spark.read.parquet(path.toString).createOrReplaceTempView("t1"); | ||
checkSparkAnswerAndOperator( | ||
spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1")) | ||
checkSparkAnswerAndOperator( | ||
spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); | ||
} | ||
} | ||
|
||
test("array_contains - test all types (native Parquet reader)") { | ||
withTempDir { dir => | ||
val path = new Path(dir.toURI.toString, "test.parquet") | ||
val filename = path.toString | ||
val random = new Random(42) | ||
withSQLConf(CometConf.COMET_ENABLED.key -> "false") { | ||
ParquetGenerator.makeParquetFile( | ||
random, | ||
spark, | ||
filename, | ||
100, | ||
DataGenOptions( | ||
allowNull = true, | ||
generateNegativeZero = true, | ||
generateArray = false, | ||
generateStruct = false, | ||
generateMap = false)) | ||
} | ||
val table = spark.read.parquet(filename) | ||
table.createOrReplaceTempView("t1") | ||
for (field <- table.schema.fields) { | ||
val fieldName = field.name | ||
val typeName = field.dataType.typeName | ||
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") | ||
.createOrReplaceTempView("t2") | ||
checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2")) | ||
checkSparkAnswerAndOperator( | ||
spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1")) | ||
sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2")) | ||
} | ||
} | ||
} | ||
|
||
// https://github.com/apache/datafusion-comet/issues/1929 | ||
ignore("array_contains - array literals") { | ||
withTempDir { dir => | ||
val path = new Path(dir.toURI.toString, "test.parquet") | ||
val filename = path.toString | ||
val random = new Random(42) | ||
withSQLConf(CometConf.COMET_ENABLED.key -> "false") { | ||
ParquetGenerator.makeParquetFile( | ||
random, | ||
spark, | ||
filename, | ||
100, | ||
DataGenOptions( | ||
allowNull = true, | ||
generateNegativeZero = true, | ||
generateArray = false, | ||
generateStruct = false, | ||
generateMap = false)) | ||
} | ||
val table = spark.read.parquet(filename) | ||
for (field <- table.schema.fields) { | ||
val typeName = field.dataType.typeName | ||
checkSparkAnswerAndOperator( | ||
spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); | ||
sql(s"SELECT array_contains(cast(null as array<$typeName>), b) FROM t2")) | ||
checkSparkAnswerAndOperator(sql( | ||
s"SELECT array_contains(cast(array() as array<$typeName>), cast(null as $typeName)) FROM t2")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. array literals might wait for #1977 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The literals are now in a dedicated test marked for exclusion ("ignored") |
||
checkSparkAnswerAndOperator(sql("SELECT array_contains(array(), 1) FROM t2")) | ||
} | ||
} | ||
} | ||
|
||
test("array_contains - test all types (convert from Parquet)") { | ||
withTempDir { dir => | ||
val path = new Path(dir.toURI.toString, "test.parquet") | ||
val filename = path.toString | ||
val random = new Random(42) | ||
withSQLConf(CometConf.COMET_ENABLED.key -> "false") { | ||
ParquetGenerator.makeParquetFile( | ||
random, | ||
spark, | ||
filename, | ||
100, | ||
DataGenOptions( | ||
allowNull = true, | ||
generateNegativeZero = true, | ||
generateArray = true, | ||
generateStruct = true, | ||
generateMap = false)) | ||
} | ||
withSQLConf( | ||
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", | ||
CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true", | ||
CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { | ||
val table = spark.read.parquet(filename) | ||
table.createOrReplaceTempView("t1") | ||
for (field <- table.schema.fields) { | ||
val fieldName = field.name | ||
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1") | ||
.createOrReplaceTempView("t2") | ||
checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t2")) | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @andygrove as Andy introduced
IncompatExpr
trait here