Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,8 @@ class SparkSession private(
parsedPlan
}
}
Dataset.ofRows(self, plan, tracker)
Dataset.ofRows(self, plan, tracker,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkExecuteStatementOperation's code eventually leads here.

QueryExecution.determineShuffleCleanupMode(sessionState.conf))
}

/** @inheritdoc */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ class AdaptiveQueryExecSuite

setupTestData()

protected override def beforeAll(): Unit = {
super.beforeAll()
// Tests depend on intermediate results that would otherwise be cleaned up when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here, AQE is turned on by default now and let's make sure it's compatible with shuffle cleanup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my wording might not have not been clear.
Thge query execution indeed succeeds and there is no correctness issue.
The issue is with how we assert in tests
In org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite#checkNumLocalShuffleReads

  private def checkNumLocalShuffleReads(
      plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
    val numShuffles = collect(plan) {
      case s: ShuffleQueryStageExec => s
    }.length

    val numLocalReads = collect(plan) {
      case read: AQEShuffleReadExec if read.isLocalRead => read
    }
    numLocalReads.foreach { r =>
      val rdd = r.execute()
      val parts = rdd.partitions
      assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
    }
    assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead))
  }

Specifically rdd.preferredLocations(_).nonEmpty) will be empty after the cleanup and the assertion fails.
if shuffle clean up is enabled, i dont think this assertion should pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we disable cleanup only for that test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an assertion method used by all tests.

// shuffle clean up is enabled, causing test failures.
sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

protected override def afterAll(): Unit = {
sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, true)
super.afterAll()
}

private def runAdaptiveAndVerifyResult(query: String,
skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = {
var finalPlanCnt = 0
Expand Down Expand Up @@ -1871,9 +1883,7 @@ class AdaptiveQueryExecSuite
}

withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
// Disabling cleanup as the test assertions depend on them
SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") {
SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
val df = sql(
"""
|SELECT * FROM (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.COMMAND
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.CommandResult
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution}
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution}
import org.apache.spark.sql.execution.HiveResult.hiveResultString
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -67,7 +66,7 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv.
new VariableSubstitution().substitute(command)
}
sparkSession.sparkContext.setJobDescription(substitutorCommand)
val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan)
val execution = sparkSession.sql(command).queryExecution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR does not enable shuffle cleanup in thriftserver yet?

Copy link
Contributor Author

@karuppayya karuppayya Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if i fully understand.
But on why i changed this line
Before: the query execution object was created with the default DonotCleanup, without honoring the shuffle cleanup configs
After: the query execution is created with the DataSet APIs, where the configs are honored when set, giving an opportunity to cleanup generated shuffle after the SQL.
Did i answer your question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, so it's more like a bug fix to make thriftserver to respect this config, but the config is off by default so thriftserver does not do shuffle cleanup by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thats right

// The SQL command has been executed above via `executePlan`, therefore we don't need to
// wrap it again with a new execution ID when getting Hive result.
execution.logical match {
Expand Down