From 2c66ba9a84b37dcc28447b77bb1d8d51c6e5bc60 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 13 Oct 2025 13:39:03 +0800 Subject: [PATCH 1/7] [SPARK-53738][SQL] PlannedWrite should preserve custom sort order when query output contains literal --- .../sql/execution/WholeStageCodegenExec.scala | 5 +- .../execution/columnar/InMemoryRelation.scala | 19 ++++++-- .../datasources/FileFormatWriter.scala | 28 +++++++---- .../sql/execution/datasources/V1Writes.scala | 25 ++++++---- .../datasources/V1WriteCommandSuite.scala | 47 ++++++++++++++++++- .../command/V1WriteHiveCommandSuite.scala | 32 +++++++++++++ 6 files changed, 132 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 246508965d3d6..4c21ac041d62e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -971,7 +971,10 @@ case class CollapseCodegenStages( // The whole-stage-codegen framework is row-based. If a plan supports columnar execution, // it can't support whole-stage-codegen at the same time. assert(!plan.supportsColumnar) - WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) + val newId = codegenStageCounter.incrementAndGet() + val newPlan = WholeStageCodegenExec(insertInputAdapter(plan))(newId) + plan.logicalLink.foreach(newPlan.setLogicalLink) + newPlan case other => other.withNewChildren(other.children.map(insertWholeStageCodegen)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index eabbc7fc74f50..bf7491625fa03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -439,9 +439,7 @@ case class InMemoryRelation( override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan) override def doCanonicalize(): logical.LogicalPlan = - copy(output = output.map(QueryPlan.normalizeExpressions(_, output)), - cacheBuilder, - outputOrdering) + withOutput(output.map(QueryPlan.normalizeExpressions(_, output))) @transient val partitionStatistics = new PartitionStatistics(output) @@ -469,8 +467,13 @@ case class InMemoryRelation( } } - def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = - InMemoryRelation(newOutput, cacheBuilder, outputOrdering, statsOfPlanToCache) + def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { + val map = AttributeMap(output.zip(newOutput)) + val newOutputOrdering = outputOrdering + .map(_.transform { case a: Attribute => map(a) }) + .asInstanceOf[Seq[SortOrder]] + InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, statsOfPlanToCache) + } override def newInstance(): this.type = { InMemoryRelation( @@ -487,6 +490,12 @@ case class InMemoryRelation( cloned } + override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = { + val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation] + copied.statsOfPlanToCache = this.statsOfPlanToCache + copied + } + override def simpleString(maxFields: Int): String = s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 55e2271dc058b..8f8dd0367f8ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -138,10 +138,6 @@ object FileFormatWriter extends Logging { statsTrackers = statsTrackers ) - // We should first sort by dynamic partition columns, then bucket id, and finally sorting - // columns. - val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ - writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) // SPARK-40588: when planned writing is disabled and AQE is enabled, @@ -157,6 +153,23 @@ object FileFormatWriter extends Logging { val actualOrdering = writeFilesOpt.map(_.child) .getOrElse(materializeAdaptiveSparkPlan(plan)) .outputOrdering + + val requiredOrdering = { + // We should first sort by dynamic partition columns, then bucket id, and finally sorting + // columns. + val sortCols = partitionColumns.drop(numStaticPartitionCols) ++ + writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns + val ordering = sortCols.map(SortOrder(_, Ascending)) + plan.logicalLink match { + case Some(WriteFiles(query, _, _, _, _, _)) => + V1WritesUtils.eliminateFoldableOrdering(ordering, query).outputOrdering + case Some(query) => + V1WritesUtils.eliminateFoldableOrdering(ordering, query).outputOrdering + case _ => + ordering + } + } + val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) SQLExecution.checkSQLExecutionId(sparkSession) @@ -201,7 +214,7 @@ object FileFormatWriter extends Logging { description: WriteJobDescription, committer: FileCommitProtocol, outputSpec: OutputSpec, - requiredOrdering: Seq[Expression], + requiredOrdering: Seq[SortOrder], partitionColumns: Seq[Attribute], sortColumns: Seq[Attribute], orderingMatched: Boolean): Set[String] = { @@ -327,13 +340,12 @@ object FileFormatWriter extends Logging { private def createSortPlan( plan: SparkPlan, - requiredOrdering: Seq[Expression], + requiredOrdering: Seq[SortOrder], outputSpec: OutputSpec): SortExec = { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and // the physical plan may have different attribute ids due to optimizer removing some // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. - val orderingExpr = bindReferences( - requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) + val orderingExpr = bindReferences(requiredOrdering, outputSpec.outputColumns) SortExec( orderingExpr, global = false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index 280fe1068d814..e64f03649986f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder} +import org.apache.spark.sql.catalyst.optimizer.{EliminateSorts, FoldablePropagation} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule @@ -97,12 +98,17 @@ object V1Writes extends Rule[LogicalPlan] { assert(empty2NullPlan.output.length == query.output.length) val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output)) - // Rewrite the attribute references in the required ordering to use the new output. - val requiredOrdering = write.requiredOrdering.map(_.transform { - case a: Attribute => attrMap.getOrElse(a, a) - }.asInstanceOf[SortOrder]) - val outputOrdering = empty2NullPlan.outputOrdering - val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering) + // Rewrite the attribute references in the required ordering to use the new output, + // then eliminate foldable ordering. + val requiredOrdering = { + val ordering = write.requiredOrdering.map(_.transform { + case a: Attribute => attrMap.getOrElse(a, a) + }.asInstanceOf[SortOrder]) + eliminateFoldableOrdering(ordering, empty2NullPlan).outputOrdering + } + val outputOrdering = eliminateFoldableOrdering( + empty2NullPlan.outputOrdering, empty2NullPlan).outputOrdering + val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering) if (orderingMatched) { empty2NullPlan } else { @@ -199,15 +205,18 @@ object V1WritesUtils { expressions.exists(_.exists(_.isInstanceOf[Empty2Null])) } + def eliminateFoldableOrdering(ordering: Seq[SortOrder], query: LogicalPlan): LogicalPlan = + EliminateSorts(FoldablePropagation(Sort(ordering, global = false, query))) + def isOrderingMatched( - requiredOrdering: Seq[Expression], + requiredOrdering: Seq[SortOrder], outputOrdering: Seq[SortOrder]): Boolean = { if (requiredOrdering.length > outputOrdering.length) { false } else { requiredOrdering.zip(outputOrdering).forall { case (requiredOrder, outputOrder) => - outputOrder.satisfies(outputOrder.copy(child = requiredOrder)) + outputOrder.satisfies(requiredOrder) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 80d771428d909..9d4f8b99bc27f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -63,10 +63,23 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper hasLogicalSort: Boolean, orderingMatched: Boolean, hasEmpty2Null: Boolean = false)(query: => Unit): Unit = { - var optimizedPlan: LogicalPlan = null + executeAndCheckOrderingAndCustomValidate( + hasLogicalSort, orderingMatched, hasEmpty2Null)(query)(_ => ()) + } + + /** + * Execute a write query and check ordering of the plan, then do custom validation + */ + protected def executeAndCheckOrderingAndCustomValidate( + hasLogicalSort: Boolean, + orderingMatched: Boolean, + hasEmpty2Null: Boolean = false)(query: => Unit)( + customValidate: LogicalPlan => Unit): Unit = { + @volatile var optimizedPlan: LogicalPlan = null val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + val conf = qe.sparkSession.sessionState.conf qe.optimizedPlan match { case w: V1WriteCommand => if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) { @@ -87,7 +100,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper // Check whether the output ordering is matched before FileFormatWriter executes rdd. assert(FileFormatWriter.outputOrderingMatched == orderingMatched, - s"Expect: $orderingMatched, Actual: ${FileFormatWriter.outputOrderingMatched}") + s"Expect orderingMatched: $orderingMatched, " + + s"Actual: ${FileFormatWriter.outputOrderingMatched}") sparkContext.listenerBus.waitUntilEmpty() @@ -103,6 +117,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper assert(empty2nullExpr == hasEmpty2Null, s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. Plan:\n$optimizedPlan") + customValidate(optimizedPlan) + spark.listenerManager.unregister(listener) } } @@ -391,4 +407,31 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } } } + + test("v1 write with sort by literal column preserve custom order") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET + |PARTITIONED BY (k) + |""".stripMargin) + executeAndCheckOrderingAndCustomValidate( + hasLogicalSort = true, orderingMatched = true) { + sql( + """ + |INSERT OVERWRITE t + |SELECT i, j, '0' as k FROM t0 SORT BY k, i + |""".stripMargin) + } { optimizedPlan => + assert { + optimizedPlan.outputOrdering.exists { + case SortOrder(attr: AttributeReference, _, _, _) => attr.name == "i" + case _ => false + } + } + } + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala index e0e056be5987c..b46ac5da0762e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution.command import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SortOrder} import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase import org.apache.spark.sql.hive.HiveUtils._ import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -126,4 +127,35 @@ class V1WriteHiveCommandSuite } } } + + test("v1 write to hive table with sort by literal column preserve custom order") { + withCovnertMetastore { _ => + withPlannedWrite { enabled => + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET + |PARTITIONED BY (k) + |""".stripMargin) + executeAndCheckOrderingAndCustomValidate( + hasLogicalSort = true, orderingMatched = true) { + sql( + """ + |INSERT OVERWRITE t + |SELECT i, j, '0' as k FROM t0 SORT BY k, i + |""".stripMargin) + } { optimizedPlan => + assert { + optimizedPlan.outputOrdering.exists { + case SortOrder(attr: AttributeReference, _, _, _) => attr.name == "i" + case _ => false + } + } + } + } + } + } + } + } } From a1bf09e03e24b093088217a11896e11248a34636 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 13 Oct 2025 14:51:39 +0800 Subject: [PATCH 2/7] fix codegenStageCounter --- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 4c21ac041d62e..0516e1edc613c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -971,8 +971,8 @@ case class CollapseCodegenStages( // The whole-stage-codegen framework is row-based. If a plan supports columnar execution, // it can't support whole-stage-codegen at the same time. assert(!plan.supportsColumnar) - val newId = codegenStageCounter.incrementAndGet() - val newPlan = WholeStageCodegenExec(insertInputAdapter(plan))(newId) + val newPlan = WholeStageCodegenExec( + insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) plan.logicalLink.foreach(newPlan.setLogicalLink) newPlan case other => From 2384ce8a9b66686ca3961dbccdecafed911ea902 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 13 Oct 2025 20:45:16 +0800 Subject: [PATCH 3/7] address comments --- .../execution/datasources/FileFormatWriter.scala | 16 +++++++++------- .../sql/execution/datasources/V1Writes.scala | 15 +++++++++------ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 8f8dd0367f8ab..29864f1310fa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -157,14 +157,15 @@ object FileFormatWriter extends Logging { val requiredOrdering = { // We should first sort by dynamic partition columns, then bucket id, and finally sorting // columns. - val sortCols = partitionColumns.drop(numStaticPartitionCols) ++ + val ordering = partitionColumns.drop(numStaticPartitionCols) ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns - val ordering = sortCols.map(SortOrder(_, Ascending)) plan.logicalLink match { case Some(WriteFiles(query, _, _, _, _, _)) => - V1WritesUtils.eliminateFoldableOrdering(ordering, query).outputOrdering + V1WritesUtils.eliminateFoldableOrdering( + ordering.map(SortOrder(_, Ascending)), query).outputOrdering.map(_.child) case Some(query) => - V1WritesUtils.eliminateFoldableOrdering(ordering, query).outputOrdering + V1WritesUtils.eliminateFoldableOrdering( + ordering.map(SortOrder(_, Ascending)), query).outputOrdering.map(_.child) case _ => ordering } @@ -214,7 +215,7 @@ object FileFormatWriter extends Logging { description: WriteJobDescription, committer: FileCommitProtocol, outputSpec: OutputSpec, - requiredOrdering: Seq[SortOrder], + requiredOrdering: Seq[Expression], partitionColumns: Seq[Attribute], sortColumns: Seq[Attribute], orderingMatched: Boolean): Set[String] = { @@ -340,12 +341,13 @@ object FileFormatWriter extends Logging { private def createSortPlan( plan: SparkPlan, - requiredOrdering: Seq[SortOrder], + requiredOrdering: Seq[Expression], outputSpec: OutputSpec): SortExec = { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and // the physical plan may have different attribute ids due to optimizer removing some // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. - val orderingExpr = bindReferences(requiredOrdering, outputSpec.outputColumns) + val orderingExpr = bindReferences( + requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) SortExec( orderingExpr, global = false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index e64f03649986f..392e3c92de3ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -106,10 +106,7 @@ object V1Writes extends Rule[LogicalPlan] { }.asInstanceOf[SortOrder]) eliminateFoldableOrdering(ordering, empty2NullPlan).outputOrdering } - val outputOrdering = eliminateFoldableOrdering( - empty2NullPlan.outputOrdering, empty2NullPlan).outputOrdering - val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering) - if (orderingMatched) { + if (isOrderingMatched(requiredOrdering.map(_.child), empty2NullPlan.outputOrdering)) { empty2NullPlan } else { Sort(requiredOrdering, global = false, empty2NullPlan) @@ -205,18 +202,24 @@ object V1WritesUtils { expressions.exists(_.exists(_.isInstanceOf[Empty2Null])) } + // SPARK-53738: the required ordering inferred from table schema (partition, bucketing, etc.) + // may contain foldable sort ordering expressions, which causes the optimized query's output + // ordering mismatch, here we calculate the required ordering more accurately, by creating a + // fake Sort node with the input query, then remove the foldable sort ordering expressions. def eliminateFoldableOrdering(ordering: Seq[SortOrder], query: LogicalPlan): LogicalPlan = EliminateSorts(FoldablePropagation(Sort(ordering, global = false, query))) + // The comparison ignores SortDirection and NullOrdering since it doesn't matter + // for writing cases. def isOrderingMatched( - requiredOrdering: Seq[SortOrder], + requiredOrdering: Seq[Expression], outputOrdering: Seq[SortOrder]): Boolean = { if (requiredOrdering.length > outputOrdering.length) { false } else { requiredOrdering.zip(outputOrdering).forall { case (requiredOrder, outputOrder) => - outputOrder.satisfies(requiredOrder) + outputOrder.satisfies(outputOrder.copy(child = requiredOrder)) } } } From fd401c06271c8fc5799dc874d91dcfa7fc0fb17c Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 13 Oct 2025 20:50:56 +0800 Subject: [PATCH 4/7] nit --- .../org/apache/spark/sql/execution/datasources/V1Writes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index 392e3c92de3ce..4493d1a6e6895 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -202,7 +202,7 @@ object V1WritesUtils { expressions.exists(_.exists(_.isInstanceOf[Empty2Null])) } - // SPARK-53738: the required ordering inferred from table schema (partition, bucketing, etc.) + // SPARK-53738: the required ordering inferred from table spec (partition, bucketing, etc.) // may contain foldable sort ordering expressions, which causes the optimized query's output // ordering mismatch, here we calculate the required ordering more accurately, by creating a // fake Sort node with the input query, then remove the foldable sort ordering expressions. From 7129fb28978bab31cc59f13a26bfab8b32c28210 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 14 Oct 2025 18:20:20 +0800 Subject: [PATCH 5/7] fallback get logicalLink from children --- .../spark/sql/execution/datasources/FileFormatWriter.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 29864f1310fa0..168d3e139cb6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -159,7 +159,9 @@ object FileFormatWriter extends Logging { // columns. val ordering = partitionColumns.drop(numStaticPartitionCols) ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns - plan.logicalLink match { + plan.logicalLink.orElse { + plan.collectFirst { case p if p.logicalLink.isDefined => p.logicalLink.get } + } match { case Some(WriteFiles(query, _, _, _, _, _)) => V1WritesUtils.eliminateFoldableOrdering( ordering.map(SortOrder(_, Ascending)), query).outputOrdering.map(_.child) From f1df0ecf2018a4b811f76955f81f283704bc5199 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 16 Oct 2025 02:44:19 +0800 Subject: [PATCH 6/7] revert change of FileFormatWriter --- .../datasources/FileFormatWriter.scala | 24 ++++--------------- .../datasources/V1WriteCommandSuite.scala | 18 ++++++++------ .../command/V1WriteHiveCommandSuite.scala | 4 +++- 3 files changed, 18 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 168d3e139cb6c..55e2271dc058b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -138,6 +138,10 @@ object FileFormatWriter extends Logging { statsTrackers = statsTrackers ) + // We should first sort by dynamic partition columns, then bucket id, and finally sorting + // columns. + val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ + writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) // SPARK-40588: when planned writing is disabled and AQE is enabled, @@ -153,26 +157,6 @@ object FileFormatWriter extends Logging { val actualOrdering = writeFilesOpt.map(_.child) .getOrElse(materializeAdaptiveSparkPlan(plan)) .outputOrdering - - val requiredOrdering = { - // We should first sort by dynamic partition columns, then bucket id, and finally sorting - // columns. - val ordering = partitionColumns.drop(numStaticPartitionCols) ++ - writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns - plan.logicalLink.orElse { - plan.collectFirst { case p if p.logicalLink.isDefined => p.logicalLink.get } - } match { - case Some(WriteFiles(query, _, _, _, _, _)) => - V1WritesUtils.eliminateFoldableOrdering( - ordering.map(SortOrder(_, Ascending)), query).outputOrdering.map(_.child) - case Some(query) => - V1WritesUtils.eliminateFoldableOrdering( - ordering.map(SortOrder(_, Ascending)), query).outputOrdering.map(_.child) - case _ => - ordering - } - } - val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) SQLExecution.checkSQLExecutionId(sparkSession) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 9d4f8b99bc27f..a46afcef3cdb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -64,7 +64,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper orderingMatched: Boolean, hasEmpty2Null: Boolean = false)(query: => Unit): Unit = { executeAndCheckOrderingAndCustomValidate( - hasLogicalSort, orderingMatched, hasEmpty2Null)(query)(_ => ()) + hasLogicalSort, Some(orderingMatched), hasEmpty2Null)(query)(_ => ()) } /** @@ -72,7 +72,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper */ protected def executeAndCheckOrderingAndCustomValidate( hasLogicalSort: Boolean, - orderingMatched: Boolean, + orderingMatched: Option[Boolean], hasEmpty2Null: Boolean = false)(query: => Unit)( customValidate: LogicalPlan => Unit): Unit = { @volatile var optimizedPlan: LogicalPlan = null @@ -98,10 +98,12 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper query - // Check whether the output ordering is matched before FileFormatWriter executes rdd. - assert(FileFormatWriter.outputOrderingMatched == orderingMatched, - s"Expect orderingMatched: $orderingMatched, " + - s"Actual: ${FileFormatWriter.outputOrderingMatched}") + orderingMatched.foreach { matched => + // Check whether the output ordering is matched before FileFormatWriter executes rdd. + assert(FileFormatWriter.outputOrderingMatched == matched, + s"Expect orderingMatched: $matched, " + + s"Actual: ${FileFormatWriter.outputOrderingMatched}") + } sparkContext.listenerBus.waitUntilEmpty() @@ -416,8 +418,10 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET |PARTITIONED BY (k) |""".stripMargin) + // Skip checking orderingMatched temporarily to avoid touching `FileFormatWriter`, + // see details at https://github.com/apache/spark/pull/52584#issuecomment-3407716019 executeAndCheckOrderingAndCustomValidate( - hasLogicalSort = true, orderingMatched = true) { + hasLogicalSort = true, orderingMatched = None) { sql( """ |INSERT OVERWRITE t diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala index b46ac5da0762e..a3e864ee55c66 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala @@ -138,8 +138,10 @@ class V1WriteHiveCommandSuite |CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET |PARTITIONED BY (k) |""".stripMargin) + // Skip checking orderingMatched temporarily to avoid touching `FileFormatWriter`, + // see details at https://github.com/apache/spark/pull/52584#issuecomment-3407716019 executeAndCheckOrderingAndCustomValidate( - hasLogicalSort = true, orderingMatched = true) { + hasLogicalSort = true, orderingMatched = None) { sql( """ |INSERT OVERWRITE t From 531c6bf05111ac35728b53e1e1df028810dae2f9 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sat, 18 Oct 2025 11:34:25 +0800 Subject: [PATCH 7/7] revert setting logical link on WholeStageCodegenExec --- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 0516e1edc613c..246508965d3d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -971,10 +971,7 @@ case class CollapseCodegenStages( // The whole-stage-codegen framework is row-based. If a plan supports columnar execution, // it can't support whole-stage-codegen at the same time. assert(!plan.supportsColumnar) - val newPlan = WholeStageCodegenExec( - insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) - plan.logicalLink.foreach(newPlan.setLogicalLink) - newPlan + WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) case other => other.withNewChildren(other.children.map(insertWholeStageCodegen)) }