From 4d8c000d542e07fafb2c78170a667efd853a4f78 Mon Sep 17 00:00:00 2001 From: Mihailo Timotic Date: Mon, 7 Jul 2025 11:47:31 +0200 Subject: [PATCH] fix --- .../sql/catalyst/analysis/Analyzer.scala | 1 + .../analysis/StripIsDuplicateMetadata.scala | 74 ++++++++++++ .../analysis/resolver/ResolverRunner.scala | 3 +- .../apache/spark/sql/internal/SQLConf.scala | 8 ++ .../sql/StripIsDuplicateMetadataSuite.scala | 106 ++++++++++++++++++ 5 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StripIsDuplicateMetadata.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/StripIsDuplicateMetadataSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 78ae9c8afe467..a2c69aa0a73a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -504,6 +504,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor ResolveEncodersInUDF), Batch("Subquery", Once, UpdateOuterReferences), + Batch("Strip __is_duplicate metadata", Once, StripIsDuplicateMetadata), Batch("Cleanup", fixedPoint, CleanupAliases), Batch("HandleSpecialCommand", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StripIsDuplicateMetadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StripIsDuplicateMetadata.scala new file mode 100644 index 0000000000000..f3f4163a4c40d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StripIsDuplicateMetadata.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.catalyst.trees.TreePattern.{ALIAS, ATTRIBUTE_REFERENCE} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder + +/** + * Strips is duplicate metadata from all attributes and aliases as it is no longer needed after + * resolution. + */ +object StripIsDuplicateMetadata extends Rule[LogicalPlan] with SQLConfHelper { + def apply(plan: LogicalPlan): LogicalPlan = AnalysisHelper.allowInvokingTransformsInAnalyzer { + if (!conf.getConf(SQLConf.STRIP_IS_DUPLICATE_METADATA)) { + plan + } else { + stripIsDuplicateMetadata(plan) + } + } + + private def stripIsDuplicateMetadata(plan: LogicalPlan) = + plan.transformAllExpressionsWithPruning(_.containsAnyPattern(ALIAS, ATTRIBUTE_REFERENCE)) { + case alias: Alias if alias.metadata.contains("__is_duplicate") => + val newMetadata = new MetadataBuilder() + .withMetadata(alias.metadata) + .remove("__is_duplicate") + .build() + + val newAlias = CurrentOrigin.withOrigin(alias.origin) { + Alias(child = alias.child, name = alias.name)( + exprId = alias.exprId, + qualifier = alias.qualifier, + explicitMetadata = Some(newMetadata), + nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys + ) + } + newAlias.copyTagsFrom(alias) + + newAlias + case attribute: Attribute if attribute.metadata.contains("__is_duplicate") => + val newMetadata = new MetadataBuilder() + .withMetadata(attribute.metadata) + .remove("__is_duplicate") + .build() + + val newAttribute = CurrentOrigin.withOrigin(attribute.origin) { + attribute.withMetadata(newMetadata = newMetadata) + } + newAttribute.copyTagsFrom(attribute) + + newAttribute + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverRunner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverRunner.scala index 37d41919f1323..74faae8c8fd7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverRunner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverRunner.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis.resolver import org.apache.spark.sql.catalyst.{QueryPlanningTracker, SQLConfHelper} -import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, CleanupAliases} +import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, CleanupAliases, StripIsDuplicateMetadata} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -41,6 +41,7 @@ class ResolverRunner( */ private val planRewriteRules: Seq[Rule[LogicalPlan]] = Seq( PruneMetadataColumns, + StripIsDuplicateMetadata, CleanupAliases ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3f648a91d7d0b..d85dbc465a59c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -241,6 +241,14 @@ object SQLConf { } } + val STRIP_IS_DUPLICATE_METADATA = + buildConf("spark.sql.analyzer.stripIsDuplicateMetadata") + .internal() + .doc("When true, strip __is_duplicate metadata after resolution batch in analysis since " + + "it is no longer needed.") + .booleanConf + .createWithDefault(true) + val ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS = buildConf("spark.sql.analyzer.uniqueNecessaryMetadataColumns") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StripIsDuplicateMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StripIsDuplicateMetadataSuite.scala new file mode 100644 index 0000000000000..5b4a5c06fec01 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/StripIsDuplicateMetadataSuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.ArrayList + +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class StripIsDuplicateMetadataSuite extends QueryTest with SharedSparkSession { + + test("Strip __is_duplicate from project list") { + withTable("t1") { + sql("CREATE TABLE t1(col1 INT, col2 INT)") + val query = + """SELECT * FROM ( + | SELECT col1, col1 FROM t1 + | UNION + | SELECT col1, col2 FROM t1 + |)""".stripMargin + + checkMetadata(query) + } + } + + test("Strip __is_duplicate from Union output") { + withTable("t1") { + sql("CREATE TABLE t1(col1 INT, col2 INT)") + val query = + """SELECT col1, col1 FROM t1 + |UNION + |SELECT col1, col2 FROM t1""".stripMargin + + checkMetadata(query) + } + } + + test("Strip __is_duplicate from CTEs") { + withTable("t1") { + sql("CREATE TABLE t1(col1 INT, col2 INT)") + val query = + """WITH cte1 AS ( + | SELECT col1, col1 FROM t1 + |), + |cte2 AS ( + | SELECT col1, col2 FROM t1 + |) + |SELECT * FROM cte1 + |UNION + |SELECT * FROM cte2""".stripMargin + + checkMetadata(query) + } + } + + test("Strip __is_duplicate from subquery") { + withTable("t1") { + sql("CREATE TABLE t1(col1 INT, col2 INT)") + val query = + """SELECT sub.col1 + |FROM ( + | SELECT col1, col1 FROM t1 + | UNION + | SELECT col1, col2 FROM t1 + |) sub + """.stripMargin + + checkMetadata(query) + } + } + + private def checkMetadata(query: String): Unit = { + for (stripMetadata <- Seq(true, false)) { + withSQLConf(SQLConf.STRIP_IS_DUPLICATE_METADATA.key -> stripMetadata.toString) { + val analyzedPlan = sql(query).queryExecution.analyzed + val duplicateAttributes = new ArrayList[NamedExpression] + analyzedPlan.foreachWithSubqueries { + case plan: LogicalPlan => plan.expressions.foreach { + case namedExpression: NamedExpression + if namedExpression.metadata.contains("__is_duplicate") => + duplicateAttributes.add(namedExpression) + case _ => + } + } + assert(duplicateAttributes.isEmpty == stripMetadata) + } + } + } +}