Skip to content

Comet cannot execute some iceberg metadata table queries #2033

@parthchandra

Description

@parthchandra

Describe the bug

Reproducing issue and steps to reproduce from this
#1987 (comment)

Many Iceberg Spark SQL Tests fail during validation because a simple query with an aggregation is executed against a metadata table and the query fails

Steps to reproduce

  1. Follow the official guide to build Comet and Iceberg, configure Spark shell and populate the Iceberg table: https://datafusion.apache.org/comet/user-guide/iceberg.html
  2. Query Iceberg metadata tables with an operator. Here is an example:
    -- default is the catalog name used in local HadoopCatalog setup
scala> spark.sql(s"SELECT COUNT(*) from default.t1.snapshots").show()

25/07/15 13:06:16 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.ClassCastException: class org.apache.iceberg.spark.source.StructInternalRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch (org.apache.iceberg.spark.source.StructInternalRow is in unnamed module of loader scala.reflect.internal.util.ScalaClassLoader$URLClassLoader @19ac93d2; org.apache.spark.sql.vectorized.ColumnarBatch is in unnamed module of loader 'app')
	at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.next(CometBatchScanExec.scala:68)
	at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.next(CometBatchScanExec.scala:57)
	at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:51)
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:155)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:154)
	at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:154)
	at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:152)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:203)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:50)
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:155)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:154)
	at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:154)
	at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:152)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:203)
	at org.apache.spark.sql.comet.execution.shuffle.CometNativeShuffleWriter.write(CometNativeShuffleWriter.scala:106)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Expected behavior

No response

Additional context

The issue seems to be caused by the fact that Iceberg creates a SparkRowReaderFactory for this kind of query which returns data in an InternalRow form and the Comet operator downstream expects columnar data.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions