Skip to content

fix: [iceberg] Switch to OSS Spark and run Iceberg Spark tests in parallel #1987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

hsiang-c
Copy link
Contributor

@hsiang-c hsiang-c commented Jul 3, 2025

Which issue does this PR close?

Closes #. #1685

Rationale for this change

When we enabled Iceberg Spark tests w/ Comet-enabled Spark in #1715

  1. We didn't enable CometShuffleManager, this PR fixes it.
  2. We implicitly loaded org.apache.comet.CometSparkSessionExtensions b/c Iceberg depends on the patched Spark. This PR explicitly configures every SparkSession.Builder with .config("spark.plugins", "org.apache.spark.CometPlugin") so that we can depend on OSS Spark
  3. The patch we applied to SparkPlanInfo.scala affects the plan to the ListenerBus event so switching to OSS Spark is okay.
  4. Split Iceberg Spark tests into 3 actions and run them in parallel. ENABLE_COMET is true for all 3 actions.

Thanks to @andygrove for pointing out.

What changes are included in this PR?

How are these changes tested?

@hsiang-c hsiang-c changed the title fix: [iceberg] Enable CometShuffleManager in Iceberg Spark tests fix: [iceberg] Switch to OSS Spark and run Iceberg Spark tests in parallel Jul 3, 2025
Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending CI

.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
.config(
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean()))
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense but could be error prone. If there is a new test that uses spark session, we miss enabling it.
Wondering if there is a good way to update all spark session at once...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazuyukitanimura

We're lucky in some cases b/c TestBase and ExtensionsTestBase consolidate SparkSession.Builder in the abstract class.

Unfortunately, other test classes and jmh build their own SparkSession each time :(

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending with CI

@codecov-commenter
Copy link

codecov-commenter commented Jul 3, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 58.20%. Comparing base (f09f8af) to head (739cfc3).
Report is 312 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1987      +/-   ##
============================================
+ Coverage     56.12%   58.20%   +2.07%     
- Complexity      976     1152     +176     
============================================
  Files           119      133      +14     
  Lines         11743    13039    +1296     
  Branches       2251     2419     +168     
============================================
+ Hits           6591     7589     +998     
- Misses         4012     4216     +204     
- Partials       1140     1234      +94     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member

andygrove commented Jul 8, 2025

I see that some tests are failing. I didn't run into this specific issue during my testing.

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1764.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1764.0 (TID 3312) (localhost executor driver): 
java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch (org.apache.spark.sql.catalyst.expressions.GenericInternalRow and org.apache.spark.sql.vectorized.ColumnarBatch are in unnamed module of loader 'app')

@hsiang-c
Copy link
Contributor Author

Most of the exceptions in Iceberg Spark SQL Tests can be reproduced by

  1. Follow the official guide to build Comet and Iceberg, configure Spark shell and populate the Iceberg table: https://datafusion.apache.org/comet/user-guide/iceberg.html
  2. Query Iceberg metadata tables with an operator. Here is an example:
-- default is the catalog name used in local HadoopCatalog setup
scala> spark.sql(s"SELECT COUNT(*) from default.t1.snapshots").show()

25/07/15 13:06:16 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.ClassCastException: class org.apache.iceberg.spark.source.StructInternalRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch (org.apache.iceberg.spark.source.StructInternalRow is in unnamed module of loader scala.reflect.internal.util.ScalaClassLoader$URLClassLoader @19ac93d2; org.apache.spark.sql.vectorized.ColumnarBatch is in unnamed module of loader 'app')
	at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.next(CometBatchScanExec.scala:68)
	at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.next(CometBatchScanExec.scala:57)
	at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:51)
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:155)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:154)
	at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:154)
	at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:152)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:203)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:50)
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:155)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:154)
	at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:154)
	at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:152)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:203)
	at org.apache.spark.sql.comet.execution.shuffle.CometNativeShuffleWriter.write(CometNativeShuffleWriter.scala:106)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

@parthchandra
Copy link
Contributor

@hsiang-c created #2033 to track this issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants