-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53738][SQL] Fix planned write when query output contains foldable orderings #52584
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
2c66ba9
a1bf09e
2384ce8
fd401c0
7129fb2
f1df0ec
531c6bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -439,9 +439,7 @@ case class InMemoryRelation( | |
override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan) | ||
|
||
override def doCanonicalize(): logical.LogicalPlan = | ||
copy(output = output.map(QueryPlan.normalizeExpressions(_, output)), | ||
cacheBuilder, | ||
outputOrdering) | ||
withOutput(output.map(QueryPlan.normalizeExpressions(_, output))) | ||
|
||
@transient val partitionStatistics = new PartitionStatistics(output) | ||
|
||
|
@@ -469,8 +467,13 @@ case class InMemoryRelation( | |
} | ||
} | ||
|
||
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = | ||
InMemoryRelation(newOutput, cacheBuilder, outputOrdering, statsOfPlanToCache) | ||
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { | ||
val map = AttributeMap(output.zip(newOutput)) | ||
val newOutputOrdering = outputOrdering | ||
.map(_.transform { case a: Attribute => map(a) }) | ||
.asInstanceOf[Seq[SortOrder]] | ||
InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, statsOfPlanToCache) | ||
Comment on lines
+470
to
+475
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue was identified in previous try, see #52474 (comment) |
||
} | ||
|
||
override def newInstance(): this.type = { | ||
InMemoryRelation( | ||
|
@@ -487,6 +490,12 @@ case class InMemoryRelation( | |
cloned | ||
} | ||
|
||
override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = { | ||
val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation] | ||
copied.statsOfPlanToCache = this.statsOfPlanToCache | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto, issue was identified in previous try, see #52474 (comment) |
||
copied | ||
} | ||
|
||
override def simpleString(maxFields: Int): String = | ||
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}" | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -138,10 +138,6 @@ object FileFormatWriter extends Logging { | |||||||||||||||||||||||||||||
statsTrackers = statsTrackers | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// We should first sort by dynamic partition columns, then bucket id, and finally sorting | ||||||||||||||||||||||||||||||
// columns. | ||||||||||||||||||||||||||||||
val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ | ||||||||||||||||||||||||||||||
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns | ||||||||||||||||||||||||||||||
val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// SPARK-40588: when planned writing is disabled and AQE is enabled, | ||||||||||||||||||||||||||||||
|
@@ -157,6 +153,23 @@ object FileFormatWriter extends Logging { | |||||||||||||||||||||||||||||
val actualOrdering = writeFilesOpt.map(_.child) | ||||||||||||||||||||||||||||||
.getOrElse(materializeAdaptiveSparkPlan(plan)) | ||||||||||||||||||||||||||||||
.outputOrdering | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
val requiredOrdering = { | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// We should first sort by dynamic partition columns, then bucket id, and finally sorting | ||||||||||||||||||||||||||||||
// columns. | ||||||||||||||||||||||||||||||
val sortCols = partitionColumns.drop(numStaticPartitionCols) ++ | ||||||||||||||||||||||||||||||
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns | ||||||||||||||||||||||||||||||
val ordering = sortCols.map(SortOrder(_, Ascending)) | ||||||||||||||||||||||||||||||
plan.logicalLink match { | ||||||||||||||||||||||||||||||
|
/** | |
* Dynamic partition writer with concurrent writers, meaning multiple concurrent writers are opened | |
* for writing. | |
* | |
* The process has the following steps: | |
* - Step 1: Maintain a map of output writers per each partition and/or bucket columns. Keep all | |
* writers opened and write rows one by one. | |
* - Step 2: If number of concurrent writers exceeds limit, sort rest of rows on partition and/or | |
* bucket column(s). Write rows one by one, and eagerly close the writer when finishing | |
* each partition and/or bucket. | |
* | |
* Caller is expected to call `writeWithIterator()` instead of `write()` to write records. | |
*/ | |
class DynamicPartitionDataConcurrentWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I have updated the code to fallback to find logical link in the children, then setLogicalLink for WholeStageCodegenExec is unnecessary for this PR, please let me know if you want me to keep it or restore it.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the query can be WholeStageCodegenExec
, that's why I set logicalLink on WholeStageCodegenExec
peter-toth marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources | |
import org.apache.spark.sql.catalyst.catalog.BucketSpec | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder} | ||
import org.apache.spark.sql.catalyst.optimizer.{EliminateSorts, FoldablePropagation} | ||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} | ||
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
|
@@ -97,12 +98,17 @@ object V1Writes extends Rule[LogicalPlan] { | |
assert(empty2NullPlan.output.length == query.output.length) | ||
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output)) | ||
|
||
// Rewrite the attribute references in the required ordering to use the new output. | ||
val requiredOrdering = write.requiredOrdering.map(_.transform { | ||
case a: Attribute => attrMap.getOrElse(a, a) | ||
}.asInstanceOf[SortOrder]) | ||
val outputOrdering = empty2NullPlan.outputOrdering | ||
val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering) | ||
// Rewrite the attribute references in the required ordering to use the new output, | ||
// then eliminate foldable ordering. | ||
val requiredOrdering = { | ||
val ordering = write.requiredOrdering.map(_.transform { | ||
case a: Attribute => attrMap.getOrElse(a, a) | ||
}.asInstanceOf[SortOrder]) | ||
eliminateFoldableOrdering(ordering, empty2NullPlan).outputOrdering | ||
} | ||
val outputOrdering = eliminateFoldableOrdering( | ||
peter-toth marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
empty2NullPlan.outputOrdering, empty2NullPlan).outputOrdering | ||
val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering) | ||
if (orderingMatched) { | ||
empty2NullPlan | ||
} else { | ||
|
@@ -199,15 +205,18 @@ object V1WritesUtils { | |
expressions.exists(_.exists(_.isInstanceOf[Empty2Null])) | ||
} | ||
|
||
def eliminateFoldableOrdering(ordering: Seq[SortOrder], query: LogicalPlan): LogicalPlan = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add comments to explain the reason behind it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
EliminateSorts(FoldablePropagation(Sort(ordering, global = false, query))) | ||
|
||
def isOrderingMatched( | ||
requiredOrdering: Seq[Expression], | ||
requiredOrdering: Seq[SortOrder], | ||
peter-toth marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
outputOrdering: Seq[SortOrder]): Boolean = { | ||
if (requiredOrdering.length > outputOrdering.length) { | ||
false | ||
} else { | ||
requiredOrdering.zip(outputOrdering).forall { | ||
case (requiredOrder, outputOrder) => | ||
outputOrder.satisfies(outputOrder.copy(child = requiredOrder)) | ||
outputOrder.satisfies(requiredOrder) | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,10 +63,23 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper | |
hasLogicalSort: Boolean, | ||
orderingMatched: Boolean, | ||
hasEmpty2Null: Boolean = false)(query: => Unit): Unit = { | ||
var optimizedPlan: LogicalPlan = null | ||
executeAndCheckOrderingAndCustomValidate( | ||
hasLogicalSort, orderingMatched, hasEmpty2Null)(query)(_ => ()) | ||
} | ||
|
||
/** | ||
* Execute a write query and check ordering of the plan, then do custom validation | ||
*/ | ||
protected def executeAndCheckOrderingAndCustomValidate( | ||
hasLogicalSort: Boolean, | ||
orderingMatched: Boolean, | ||
hasEmpty2Null: Boolean = false)(query: => Unit)( | ||
customValidate: LogicalPlan => Unit): Unit = { | ||
@volatile var optimizedPlan: LogicalPlan = null | ||
|
||
val listener = new QueryExecutionListener { | ||
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { | ||
val conf = qe.sparkSession.sessionState.conf | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a bugfix, the listener runs in another thread, without this change, |
||
qe.optimizedPlan match { | ||
case w: V1WriteCommand => | ||
if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) { | ||
|
@@ -87,7 +100,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper | |
|
||
// Check whether the output ordering is matched before FileFormatWriter executes rdd. | ||
assert(FileFormatWriter.outputOrderingMatched == orderingMatched, | ||
s"Expect: $orderingMatched, Actual: ${FileFormatWriter.outputOrderingMatched}") | ||
s"Expect orderingMatched: $orderingMatched, " + | ||
s"Actual: ${FileFormatWriter.outputOrderingMatched}") | ||
|
||
sparkContext.listenerBus.waitUntilEmpty() | ||
|
||
|
@@ -103,6 +117,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper | |
assert(empty2nullExpr == hasEmpty2Null, | ||
s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. Plan:\n$optimizedPlan") | ||
|
||
customValidate(optimizedPlan) | ||
|
||
spark.listenerManager.unregister(listener) | ||
} | ||
} | ||
|
@@ -391,4 +407,31 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write | |
} | ||
} | ||
} | ||
|
||
test("v1 write with sort by literal column preserve custom order") { | ||
withPlannedWrite { enabled => | ||
withTable("t") { | ||
sql( | ||
""" | ||
|CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET | ||
|PARTITIONED BY (k) | ||
|""".stripMargin) | ||
executeAndCheckOrderingAndCustomValidate( | ||
hasLogicalSort = true, orderingMatched = true) { | ||
sql( | ||
""" | ||
|INSERT OVERWRITE t | ||
|SELECT i, j, '0' as k FROM t0 SORT BY k, i | ||
|""".stripMargin) | ||
} { optimizedPlan => | ||
assert { | ||
optimizedPlan.outputOrdering.exists { | ||
case SortOrder(attr: AttributeReference, _, _, _) => attr.name == "i" | ||
case _ => false | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that
WholeStageCodegenExec
misses setting logicalLink, is it by design?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, and it never caused issue with AQE before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't seen the real issues in both production and existing UT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan if I revert changes in
FileFormatWriter.scala
, this is not required.Do you want me to keep it or revert it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's revert it to be safe, the logical plan is quite sensitive to AQE. And technically, the
CollapseCodegenStages
is newly generated at planning phase, it does have have a corresponding logical plan.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan reverted, and thanks for the explanation.