Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,10 @@ case class CollapseCodegenStages(
// The whole-stage-codegen framework is row-based. If a plan supports columnar execution,
// it can't support whole-stage-codegen at the same time.
assert(!plan.supportsColumnar)
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
val newId = codegenStageCounter.incrementAndGet()
val newPlan = WholeStageCodegenExec(insertInputAdapter(plan))(newId)
plan.logicalLink.foreach(newPlan.setLogicalLink)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that WholeStageCodegenExec misses setting logicalLink, is it by design?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, and it never caused issue with AQE before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't seen the real issues in both production and existing UT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan if I revert changes in FileFormatWriter.scala, this is not required.

Do you want me to keep it or revert it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's revert it to be safe, the logical plan is quite sensitive to AQE. And technically, the CollapseCodegenStages is newly generated at planning phase, it does have have a corresponding logical plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan reverted, and thanks for the explanation.

newPlan
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,7 @@ case class InMemoryRelation(
override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)

override def doCanonicalize(): logical.LogicalPlan =
copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
cacheBuilder,
outputOrdering)
withOutput(output.map(QueryPlan.normalizeExpressions(_, output)))

@transient val partitionStatistics = new PartitionStatistics(output)

Expand Down Expand Up @@ -469,8 +467,13 @@ case class InMemoryRelation(
}
}

def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
InMemoryRelation(newOutput, cacheBuilder, outputOrdering, statsOfPlanToCache)
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
val map = AttributeMap(output.zip(newOutput))
val newOutputOrdering = outputOrdering
.map(_.transform { case a: Attribute => map(a) })
.asInstanceOf[Seq[SortOrder]]
InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, statsOfPlanToCache)
Comment on lines +470 to +475
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue was identified in previous try, see #52474 (comment)

}

override def newInstance(): this.type = {
InMemoryRelation(
Expand All @@ -487,6 +490,12 @@ case class InMemoryRelation(
cloned
}

override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = {
val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation]
copied.statsOfPlanToCache = this.statsOfPlanToCache
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, issue was identified in previous try, see #52474 (comment)

copied
}

override def simpleString(maxFields: Int): String =
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ object FileFormatWriter extends Logging {
statsTrackers = statsTrackers
)

// We should first sort by dynamic partition columns, then bucket id, and finally sorting
// columns.
val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan)

// SPARK-40588: when planned writing is disabled and AQE is enabled,
Expand All @@ -157,6 +153,23 @@ object FileFormatWriter extends Logging {
val actualOrdering = writeFilesOpt.map(_.child)
.getOrElse(materializeAdaptiveSparkPlan(plan))
.outputOrdering

val requiredOrdering = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the code path when planned write is disabled?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave it unfixed, as this code path is rarely reached and this fix is kind of an optimization: it's only about perf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a necessary change for the "planned write" to make UT happy

if (Utils.isTesting) outputOrderingMatched = orderingMatched

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this is a necessary for the current codebase, but do we really need to do it in theory? The planned write should have added the sort already, ideally we don't need to try to add sort again here.

Copy link
Member Author

@pan3793 pan3793 Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The planned write should have added the sort already, ideally we don't need to try to add sort again here.

yes, exactly

// We should first sort by dynamic partition columns, then bucket id, and finally sorting
// columns.
val sortCols = partitionColumns.drop(numStaticPartitionCols) ++
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
val ordering = sortCols.map(SortOrder(_, Ascending))
plan.logicalLink match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about this. In AQE we have a fallback to find logical link in the children, so that it's more reliable. Now we have the risk of perf regression if the logical link is not present and we add an extra sort.

Shall we remove the adding sort here completly if planned write is enabled (WriteFiles is present)?

Copy link
Member Author

@pan3793 pan3793 Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about this. In AQE we have a fallback to find logical link in the children, so that it's more reliable.

@cloud-fan do you suggest

- plan.logicalLink match {
+ plan.logicalLink.orElse {
+   plan.collectFirst { case p if p.logicalLink.isDefined => p.logicalLink.get }
+ } match {

Shall we remove the adding sort here completly if planned write is enabled (WriteFiles is present)?

I think the current code has already satisfied your expectation, when planned write is enabled:

  1. if concurrent writer is disabled, the calculated required ordering won't be used.
  2. if concurrent writer is enabled, the calculated required ordering is only used in the concurrent writer step 2.

/**
* Dynamic partition writer with concurrent writers, meaning multiple concurrent writers are opened
* for writing.
*
* The process has the following steps:
* - Step 1: Maintain a map of output writers per each partition and/or bucket columns. Keep all
* writers opened and write rows one by one.
* - Step 2: If number of concurrent writers exceeds limit, sort rest of rows on partition and/or
* bucket column(s). Write rows one by one, and eagerly close the writer when finishing
* each partition and/or bucket.
*
* Caller is expected to call `writeWithIterator()` instead of `write()` to write records.
*/
class DynamicPartitionDataConcurrentWriter(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I have updated the code to fallback to find logical link in the children, then setLogicalLink for WholeStageCodegenExec is unnecessary for this PR, please let me know if you want me to keep it or restore it.

case Some(WriteFiles(query, _, _, _, _, _)) =>
V1WritesUtils.eliminateFoldableOrdering(ordering, query).outputOrdering
case Some(query) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the query can be WholeStageCodegenExec, that's why I set logicalLink on WholeStageCodegenExec

V1WritesUtils.eliminateFoldableOrdering(ordering, query).outputOrdering
case _ =>
ordering
}
}

val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering)

SQLExecution.checkSQLExecutionId(sparkSession)
Expand Down Expand Up @@ -201,7 +214,7 @@ object FileFormatWriter extends Logging {
description: WriteJobDescription,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
requiredOrdering: Seq[Expression],
requiredOrdering: Seq[SortOrder],
partitionColumns: Seq[Attribute],
sortColumns: Seq[Attribute],
orderingMatched: Boolean): Set[String] = {
Expand Down Expand Up @@ -327,13 +340,12 @@ object FileFormatWriter extends Logging {

private def createSortPlan(
plan: SparkPlan,
requiredOrdering: Seq[Expression],
requiredOrdering: Seq[SortOrder],
outputSpec: OutputSpec): SortExec = {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = bindReferences(
requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns)
val orderingExpr = bindReferences(requiredOrdering, outputSpec.outputColumns)
SortExec(
orderingExpr,
global = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder}
import org.apache.spark.sql.catalyst.optimizer.{EliminateSorts, FoldablePropagation}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -97,12 +98,17 @@ object V1Writes extends Rule[LogicalPlan] {
assert(empty2NullPlan.output.length == query.output.length)
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))

// Rewrite the attribute references in the required ordering to use the new output.
val requiredOrdering = write.requiredOrdering.map(_.transform {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])
val outputOrdering = empty2NullPlan.outputOrdering
val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)
// Rewrite the attribute references in the required ordering to use the new output,
// then eliminate foldable ordering.
val requiredOrdering = {
val ordering = write.requiredOrdering.map(_.transform {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])
eliminateFoldableOrdering(ordering, empty2NullPlan).outputOrdering
}
val outputOrdering = eliminateFoldableOrdering(
empty2NullPlan.outputOrdering, empty2NullPlan).outputOrdering
val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
if (orderingMatched) {
empty2NullPlan
} else {
Expand Down Expand Up @@ -199,15 +205,18 @@ object V1WritesUtils {
expressions.exists(_.exists(_.isInstanceOf[Empty2Null]))
}

def eliminateFoldableOrdering(ordering: Seq[SortOrder], query: LogicalPlan): LogicalPlan =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add comments to explain the reason behind it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

EliminateSorts(FoldablePropagation(Sort(ordering, global = false, query)))

def isOrderingMatched(
requiredOrdering: Seq[Expression],
requiredOrdering: Seq[SortOrder],
outputOrdering: Seq[SortOrder]): Boolean = {
if (requiredOrdering.length > outputOrdering.length) {
false
} else {
requiredOrdering.zip(outputOrdering).forall {
case (requiredOrder, outputOrder) =>
outputOrder.satisfies(outputOrder.copy(child = requiredOrder))
outputOrder.satisfies(requiredOrder)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,23 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper
hasLogicalSort: Boolean,
orderingMatched: Boolean,
hasEmpty2Null: Boolean = false)(query: => Unit): Unit = {
var optimizedPlan: LogicalPlan = null
executeAndCheckOrderingAndCustomValidate(
hasLogicalSort, orderingMatched, hasEmpty2Null)(query)(_ => ())
}

/**
* Execute a write query and check ordering of the plan, then do custom validation
*/
protected def executeAndCheckOrderingAndCustomValidate(
hasLogicalSort: Boolean,
orderingMatched: Boolean,
hasEmpty2Null: Boolean = false)(query: => Unit)(
customValidate: LogicalPlan => Unit): Unit = {
@volatile var optimizedPlan: LogicalPlan = null

val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
val conf = qe.sparkSession.sessionState.conf
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bugfix, the listener runs in another thread, without this change, conf.getConf actually gets conf from the thread local, thus may cause issues on concurrency running tests

qe.optimizedPlan match {
case w: V1WriteCommand =>
if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) {
Expand All @@ -87,7 +100,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper

// Check whether the output ordering is matched before FileFormatWriter executes rdd.
assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
s"Expect: $orderingMatched, Actual: ${FileFormatWriter.outputOrderingMatched}")
s"Expect orderingMatched: $orderingMatched, " +
s"Actual: ${FileFormatWriter.outputOrderingMatched}")

sparkContext.listenerBus.waitUntilEmpty()

Expand All @@ -103,6 +117,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper
assert(empty2nullExpr == hasEmpty2Null,
s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. Plan:\n$optimizedPlan")

customValidate(optimizedPlan)

spark.listenerManager.unregister(listener)
}
}
Expand Down Expand Up @@ -391,4 +407,31 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write
}
}
}

test("v1 write with sort by literal column preserve custom order") {
withPlannedWrite { enabled =>
withTable("t") {
sql(
"""
|CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET
|PARTITIONED BY (k)
|""".stripMargin)
executeAndCheckOrderingAndCustomValidate(
hasLogicalSort = true, orderingMatched = true) {
sql(
"""
|INSERT OVERWRITE t
|SELECT i, j, '0' as k FROM t0 SORT BY k, i
|""".stripMargin)
} { optimizedPlan =>
assert {
optimizedPlan.outputOrdering.exists {
case SortOrder(attr: AttributeReference, _, _, _) => attr.name == "i"
case _ => false
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution.command

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SortOrder}
import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
import org.apache.spark.sql.hive.HiveUtils._
import org.apache.spark.sql.hive.test.TestHiveSingleton
Expand Down Expand Up @@ -126,4 +127,35 @@ class V1WriteHiveCommandSuite
}
}
}

test("v1 write to hive table with sort by literal column preserve custom order") {
withCovnertMetastore { _ =>
withPlannedWrite { enabled =>
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
withTable("t") {
sql(
"""
|CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET
|PARTITIONED BY (k)
|""".stripMargin)
executeAndCheckOrderingAndCustomValidate(
hasLogicalSort = true, orderingMatched = true) {
sql(
"""
|INSERT OVERWRITE t
|SELECT i, j, '0' as k FROM t0 SORT BY k, i
|""".stripMargin)
} { optimizedPlan =>
assert {
optimizedPlan.outputOrdering.exists {
case SortOrder(attr: AttributeReference, _, _, _) => attr.name == "i"
case _ => false
}
}
}
}
}
}
}
}
}