Skip to content

[SPARK-52696][SQL] Strip __is_duplicate metadata after analysis #51389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
ResolveEncodersInUDF),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Strip __is_duplicate metadata", Once, StripIsDuplicateMetadata),
Batch("Cleanup", fixedPoint,
CleanupAliases),
Batch("HandleSpecialCommand", Once,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.TreePattern.{ALIAS, ATTRIBUTE_REFERENCE}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.MetadataBuilder

/**
* Strips is duplicate metadata from all attributes and aliases as it is no longer needed after
* resolution.
*/
object StripIsDuplicateMetadata extends Rule[LogicalPlan] with SQLConfHelper {
def apply(plan: LogicalPlan): LogicalPlan = AnalysisHelper.allowInvokingTransformsInAnalyzer {
if (!conf.getConf(SQLConf.STRIP_IS_DUPLICATE_METADATA)) {
plan
} else {
stripIsDuplicateMetadata(plan)
}
}

private def stripIsDuplicateMetadata(plan: LogicalPlan) =
plan.transformAllExpressionsWithPruning(_.containsAnyPattern(ALIAS, ATTRIBUTE_REFERENCE)) {
case alias: Alias if alias.metadata.contains("__is_duplicate") =>
val newMetadata = new MetadataBuilder()
.withMetadata(alias.metadata)
.remove("__is_duplicate")
.build()

val newAlias = CurrentOrigin.withOrigin(alias.origin) {
Alias(child = alias.child, name = alias.name)(
exprId = alias.exprId,
qualifier = alias.qualifier,
explicitMetadata = Some(newMetadata),
nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
)
}
newAlias.copyTagsFrom(alias)

newAlias
case attribute: Attribute if attribute.metadata.contains("__is_duplicate") =>
val newMetadata = new MetadataBuilder()
.withMetadata(attribute.metadata)
.remove("__is_duplicate")
.build()

val newAttribute = CurrentOrigin.withOrigin(attribute.origin) {
attribute.withMetadata(newMetadata = newMetadata)
}
newAttribute.copyTagsFrom(attribute)

newAttribute
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis.resolver

import org.apache.spark.sql.catalyst.{QueryPlanningTracker, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, CleanupAliases}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, CleanupAliases, StripIsDuplicateMetadata}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -41,6 +41,7 @@ class ResolverRunner(
*/
private val planRewriteRules: Seq[Rule[LogicalPlan]] = Seq(
PruneMetadataColumns,
StripIsDuplicateMetadata,
CleanupAliases
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ object SQLConf {
}
}

val STRIP_IS_DUPLICATE_METADATA =
buildConf("spark.sql.analyzer.stripIsDuplicateMetadata")
.internal()
.doc("When true, strip __is_duplicate metadata after resolution batch in analysis since " +
"it is no longer needed.")
.booleanConf
.createWithDefault(true)

val ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS =
buildConf("spark.sql.analyzer.uniqueNecessaryMetadataColumns")
.internal()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.util.ArrayList

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

class StripIsDuplicateMetadataSuite extends QueryTest with SharedSparkSession {

test("Strip __is_duplicate from project list") {
withTable("t1") {
sql("CREATE TABLE t1(col1 INT, col2 INT)")
val query =
"""SELECT * FROM (
| SELECT col1, col1 FROM t1
| UNION
| SELECT col1, col2 FROM t1
|)""".stripMargin

checkMetadata(query)
}
}

test("Strip __is_duplicate from Union output") {
withTable("t1") {
sql("CREATE TABLE t1(col1 INT, col2 INT)")
val query =
"""SELECT col1, col1 FROM t1
|UNION
|SELECT col1, col2 FROM t1""".stripMargin

checkMetadata(query)
}
}

test("Strip __is_duplicate from CTEs") {
withTable("t1") {
sql("CREATE TABLE t1(col1 INT, col2 INT)")
val query =
"""WITH cte1 AS (
| SELECT col1, col1 FROM t1
|),
|cte2 AS (
| SELECT col1, col2 FROM t1
|)
|SELECT * FROM cte1
|UNION
|SELECT * FROM cte2""".stripMargin

checkMetadata(query)
}
}

test("Strip __is_duplicate from subquery") {
withTable("t1") {
sql("CREATE TABLE t1(col1 INT, col2 INT)")
val query =
"""SELECT sub.col1
|FROM (
| SELECT col1, col1 FROM t1
| UNION
| SELECT col1, col2 FROM t1
|) sub
""".stripMargin

checkMetadata(query)
}
}

private def checkMetadata(query: String): Unit = {
for (stripMetadata <- Seq(true, false)) {
withSQLConf(SQLConf.STRIP_IS_DUPLICATE_METADATA.key -> stripMetadata.toString) {
val analyzedPlan = sql(query).queryExecution.analyzed
val duplicateAttributes = new ArrayList[NamedExpression]
analyzedPlan.foreachWithSubqueries {
case plan: LogicalPlan => plan.expressions.foreach {
case namedExpression: NamedExpression
if namedExpression.metadata.contains("__is_duplicate") =>
duplicateAttributes.add(namedExpression)
case _ =>
}
}
assert(duplicateAttributes.isEmpty == stripMetadata)
}
}
}
}