-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53469][SQL] Ability to cleanup shuffle in Thrift server #52213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
5103f76
f7c5a78
b6914bb
2841ec3
23176ed
fee6ad9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEProp | |
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} | ||
import org.apache.spark.sql.types.{IntegerType, StructType} | ||
import org.apache.spark.util.Utils | ||
|
||
class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSparkSession | ||
with AdaptiveSparkPlanHelper { | ||
|
@@ -205,6 +206,9 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp | |
sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5") | ||
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5") | ||
|
||
// Tests depend on intermediate results that would otherwise be cleaned up when | ||
|
||
// shuffle clean up is enabled, causing test failures. | ||
conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) | ||
|
||
// `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing | ||
// complicated. | ||
conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeScalarSubqueries.ruleName) | ||
|
@@ -213,6 +217,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp | |
protected override def afterAll(): Unit = try { | ||
conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, | ||
SQLConf.OPTIMIZER_EXCLUDED_RULES.defaultValueString) | ||
conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, Utils.isTesting) | ||
|
||
sql("DROP TABLE IF EXISTS bf1") | ||
sql("DROP TABLE IF EXISTS bf2") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,18 @@ class AdaptiveQueryExecSuite | |
|
||
setupTestData() | ||
|
||
protected override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
// Tests depend on intermediate results that would otherwise be cleaned up when | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto here, AQE is turned on by default now and let's make sure it's compatible with shuffle cleanup There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think my wording might not have not been clear.
Specifically There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we disable cleanup only for that test case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an assertion method used by all tests. |
||
// shuffle clean up is enabled, causing test failures. | ||
sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
|
||
protected override def afterAll(): Unit = { | ||
sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, true) | ||
super.afterAll() | ||
} | ||
|
||
private def runAdaptiveAndVerifyResult(query: String, | ||
skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = { | ||
var finalPlanCnt = 0 | ||
|
@@ -1871,9 +1883,7 @@ class AdaptiveQueryExecSuite | |
} | ||
|
||
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | ||
SQLConf.SHUFFLE_PARTITIONS.key -> "5", | ||
// Disabling cleanup as the test assertions depend on them | ||
SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") { | ||
SQLConf.SHUFFLE_PARTITIONS.key -> "5") { | ||
val df = sql( | ||
""" | ||
|SELECT * FROM ( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,8 +30,7 @@ import org.apache.spark.internal.Logging | |
import org.apache.spark.internal.LogKeys.COMMAND | ||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.catalyst.plans.logical.CommandResult | ||
import org.apache.spark.sql.classic.ClassicConversions._ | ||
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution} | ||
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution} | ||
import org.apache.spark.sql.execution.HiveResult.hiveResultString | ||
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} | ||
import org.apache.spark.util.Utils | ||
|
@@ -67,7 +66,7 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. | |
new VariableSubstitution().substitute(command) | ||
} | ||
sparkSession.sparkContext.setJobDescription(substitutorCommand) | ||
val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan) | ||
val execution = sparkSession.sql(command).queryExecution | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this PR does not enable shuffle cleanup in thriftserver yet? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if i fully understand. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, so it's more like a bug fix to make thriftserver to respect this config, but the config is off by default so thriftserver does not do shuffle cleanup by default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, thats right |
||
// The SQL command has been executed above via `executePlan`, therefore we don't need to | ||
// wrap it again with a new execution ID when getting Hive result. | ||
execution.logical match { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkExecuteStatementOperation's code eventually leads here.