Skip to content

Commit 9b24d4c

Browse files
Burak Yavuzcloud-fan
authored andcommitted
[SPARK-52300][SQL] Make SQL UDTVF resolution use consistent configurations with view resolution
### What changes were proposed in this pull request? View resolution and SQL UDTVF resolution store behavior related SQL configurations at creation time. However, view resolution can pass on additional configurations set post-session creation time. This PR refactors the View resolution configurations into an Analyzer function and uses it consistently between View resolution and SQL UDTVF resolution. ### Why are the changes needed? When resolving SQL User-defined Table Valued Functions, the catalog options do not get registered correctly if the configurations were overridden after a session is created (that is, not available as an override at Spark startup). This is not a problem during View resolution. This rift is unnecessary and the resolution rules should be consistent with regards to which SQL configurations get passed down during UDTVF resolution similar with view resolution. ### Does this PR introduce _any_ user-facing change? If a SQL configuration, such as a catalog option (prefixed by `spark.sql.catalog.`) is overridden during a session, these options were not being passed correctly down to SQL User-defined table valued functions during resolution. This PR fixes that. ### How was this patch tested? Unit tests. Added a feature flag and tested the changes with the feature flag off to maintain old behavior. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51014 from brkyvz/viewUDFRes. Lead-authored-by: Burak Yavuz <brkyvz@apache.org> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent a7cb882 commit 9b24d4c

File tree

4 files changed

+254
-31
lines changed

4 files changed

+254
-31
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,34 @@ object AnalysisContext {
242242
}
243243
}
244244

245+
object Analyzer {
246+
// List of configurations that should be passed on when resolving views and SQL UDF.
247+
private val RETAINED_ANALYSIS_FLAGS = Seq(
248+
// retainedHiveConfigs
249+
// TODO: remove these Hive-related configs after the `RelationConversions` is moved to
250+
// optimization phase.
251+
"spark.sql.hive.convertMetastoreParquet",
252+
"spark.sql.hive.convertMetastoreOrc",
253+
"spark.sql.hive.convertInsertingPartitionedTable",
254+
"spark.sql.hive.convertInsertingUnpartitionedTable",
255+
"spark.sql.hive.convertMetastoreCtas",
256+
// retainedLoggingConfigs
257+
"spark.sql.planChangeLog.level",
258+
"spark.sql.expressionTreeChangeLog.level"
259+
)
260+
261+
def retainResolutionConfigsForAnalysis(newConf: SQLConf, existingConf: SQLConf): Unit = {
262+
val retainedConfigs = existingConf.getAllConfs.filter { case (key, _) =>
263+
// Also apply catalog configs
264+
RETAINED_ANALYSIS_FLAGS.contains(key) || key.startsWith("spark.sql.catalog.")
265+
}
266+
267+
retainedConfigs.foreach { case (k, v) =>
268+
newConf.settings.put(k, v)
269+
}
270+
}
271+
}
272+
245273
/**
246274
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
247275
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
@@ -2486,9 +2514,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
24862514
val plan = v1SessionCatalog.makeSQLFunctionPlan(f.name, f.function, f.inputs)
24872515
val resolved = SQLFunctionContext.withSQLFunction {
24882516
// Resolve the SQL function plan using its context.
2489-
val conf = new SQLConf()
2490-
f.function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) }
2491-
SQLConf.withExistingConf(conf) {
2517+
val newConf = new SQLConf()
2518+
f.function.getSQLConfigs.foreach { case (k, v) => newConf.settings.put(k, v) }
2519+
if (conf.getConf(SQLConf.APPLY_SESSION_CONF_OVERRIDES_TO_FUNCTION_RESOLUTION)) {
2520+
Analyzer.retainResolutionConfigsForAnalysis(newConf = newConf, existingConf = conf)
2521+
}
2522+
SQLConf.withExistingConf(newConf) {
24922523
executeSameContext(plan)
24932524
}
24942525
}
@@ -2779,9 +2810,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
27792810
_.containsPattern(SQL_TABLE_FUNCTION)) {
27802811
case SQLTableFunction(name, function, inputs, output) =>
27812812
// Resolve the SQL table function plan using its function context.
2782-
val conf = new SQLConf()
2783-
function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) }
2784-
val resolved = SQLConf.withExistingConf(conf) {
2813+
val newConf = new SQLConf()
2814+
function.getSQLConfigs.foreach { case (k, v) => newConf.settings.put(k, v) }
2815+
if (conf.getConf(SQLConf.APPLY_SESSION_CONF_OVERRIDES_TO_FUNCTION_RESOLUTION)) {
2816+
Analyzer.retainResolutionConfigsForAnalysis(newConf = newConf, existingConf = conf)
2817+
}
2818+
val resolved = SQLConf.withExistingConf(newConf) {
27852819
val plan = v1SessionCatalog.makeSQLTableFunctionPlan(name, function, inputs, output)
27862820
SQLFunctionContext.withSQLFunction {
27872821
executeSameContext(plan)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.catalyst.{AliasIdentifier, InternalRow, SQLConfHelper}
21-
import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode}
21+
import org.apache.spark.sql.catalyst.analysis.{Analyzer, AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode}
2222
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
2323
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
2424
import org.apache.spark.sql.catalyst.expressions._
@@ -853,33 +853,11 @@ object View {
853853
// For temporary view, we always use captured sql configs
854854
if (activeConf.useCurrentSQLConfigsForView && !isTempView) return activeConf
855855

856-
// We retain below configs from current session because they are not captured by view
857-
// as optimization configs but they are still needed during the view resolution.
858-
// TODO: remove this `retainedHiveConfigs` after the `RelationConversions` is moved to
859-
// optimization phase.
860-
val retainedHiveConfigs = Seq(
861-
"spark.sql.hive.convertMetastoreParquet",
862-
"spark.sql.hive.convertMetastoreOrc",
863-
"spark.sql.hive.convertInsertingPartitionedTable",
864-
"spark.sql.hive.convertInsertingUnpartitionedTable",
865-
"spark.sql.hive.convertMetastoreCtas"
866-
)
867-
868-
val retainedLoggingConfigs = Seq(
869-
"spark.sql.planChangeLog.level",
870-
"spark.sql.expressionTreeChangeLog.level"
871-
)
872-
873-
val retainedConfigs = activeConf.getAllConfs.filter { case (key, _) =>
874-
retainedHiveConfigs.contains(key) || retainedLoggingConfigs.contains(key) || key.startsWith(
875-
"spark.sql.catalog."
876-
)
877-
}
878-
879856
val sqlConf = new SQLConf()
880-
for ((k, v) <- configs ++ retainedConfigs) {
857+
for ((k, v) <- configs) {
881858
sqlConf.settings.put(k, v)
882859
}
860+
Analyzer.retainResolutionConfigsForAnalysis(newConf = sqlConf, existingConf = activeConf)
883861
sqlConf
884862
}
885863
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2035,6 +2035,15 @@ object SQLConf {
20352035
.booleanConf
20362036
.createWithDefault(true)
20372037

2038+
val APPLY_SESSION_CONF_OVERRIDES_TO_FUNCTION_RESOLUTION =
2039+
buildConf("spark.sql.analyzer.sqlFunctionResolution.applyConfOverrides")
2040+
.internal()
2041+
.version("4.0.1")
2042+
.doc("When true, applies the conf overrides for certain feature flags during the " +
2043+
"resolution of user-defined sql table valued functions, consistent with view resolution.")
2044+
.booleanConf
2045+
.createWithDefault(true)
2046+
20382047
// Whether to retain group by columns or not in GroupedData.agg.
20392048
val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns")
20402049
.version("1.4.0")
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.analysis
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.sql.SparkSessionExtensions
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
import org.apache.spark.sql.test.SharedSparkSession
25+
26+
class AnalysisConfOverrideSuite extends SharedSparkSession {
27+
28+
override protected def sparkConf: SparkConf = {
29+
super.sparkConf
30+
.set("spark.sql.extensions", "com.databricks.sql.ConfOverrideValidationExtensions")
31+
}
32+
33+
override def beforeAll(): Unit = {
34+
super.beforeAll()
35+
spark.sql("SELECT id as a FROM range(10)").createTempView("table")
36+
spark.sql("SELECT id as a, (id + 1) as b FROM range(10)").createTempView("table2")
37+
}
38+
39+
override def afterAll(): Unit = {
40+
spark.catalog.dropTempView("table")
41+
spark.catalog.dropTempView("table2")
42+
super.afterAll()
43+
}
44+
45+
private def testOverride(testName: String)(f: (String, String) => Unit): Unit = {
46+
test(testName) {
47+
val key = "spark.sql.catalog.x.y"
48+
val value = "true"
49+
withSQLConf(key -> value) {
50+
f
51+
}
52+
}
53+
}
54+
55+
testOverride("simple plan") { case (key, value) =>
56+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
57+
spark.sql("SELECT * FROM TaBlE")
58+
}
59+
}
60+
61+
testOverride("CTE") { case (key, value) =>
62+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
63+
spark.sql(
64+
"""WITH cte AS (SELECT * FROM TaBlE)
65+
|SELECT * FROM cte
66+
|""".stripMargin)
67+
}
68+
}
69+
70+
testOverride("Subquery") { case (key, value) =>
71+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
72+
spark.sql(
73+
"""
74+
|SELECT * FROM TaBlE WHERE a in (SELECT a FROM table2)
75+
|""".stripMargin)
76+
}
77+
}
78+
79+
testOverride("View") { case (key, value) =>
80+
withTable("test_table", "test_table2") {
81+
spark.sql("CREATE TABLE test_table AS SELECT id as a FROM range(10)")
82+
spark.sql("CREATE TABLE test_table2 AS SELECT id as a, (id + 1) as b FROM range(10)")
83+
withView("test_view") {
84+
spark.sql("CREATE VIEW test_view AS " +
85+
"SELECT * FROM test_table WHERE a in (SELECT a FROM test_table2)")
86+
87+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
88+
spark.sql("SELECT * FROM test_view")
89+
}
90+
}
91+
}
92+
}
93+
94+
testOverride("user defined SQL functions") { case (key, value) =>
95+
withTable("test_table", "test_table2") {
96+
spark.sql("CREATE TABLE test_table AS SELECT id as a FROM range(10)")
97+
spark.sql("CREATE TABLE test_table2 AS SELECT id as a, (id + 1) as b FROM range(10)")
98+
withUserDefinedFunction("f1" -> true, "f2" -> false, "f3" -> false) {
99+
spark.sql(
100+
"""CREATE OR REPLACE TEMPORARY FUNCTION f1() RETURNS TABLE (a bigint)
101+
|RETURN SELECT * FROM test_table WHERE a in (SELECT a FROM test_table2)
102+
|""".stripMargin
103+
)
104+
spark.sql(
105+
"""CREATE OR REPLACE FUNCTION f2() RETURNS TABLE (a bigint)
106+
|RETURN SELECT * FROM test_table WHERE a in (SELECT a FROM test_table2)
107+
|""".stripMargin
108+
)
109+
spark.sql(
110+
"""CREATE OR REPLACE FUNCTION f3(in bigint) RETURNS (out bigint)
111+
|RETURN in + 1
112+
|""".stripMargin
113+
)
114+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
115+
spark.sql("SELECT * FROM f1()")
116+
}
117+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
118+
spark.sql("SELECT * FROM f2()")
119+
}
120+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
121+
spark.sql("SELECT f3(1)")
122+
}
123+
}
124+
}
125+
}
126+
127+
testOverride("user defined SQL functions - test conf disabled") { case (key, value) =>
128+
withTable("test_table", "test_table2") {
129+
spark.sql("CREATE TABLE test_table AS SELECT id as a FROM range(10)")
130+
spark.sql("CREATE TABLE test_table2 AS SELECT id as a, (id + 1) as b FROM range(10)")
131+
// turn the flag off to maintain former behavior
132+
withSQLConf("spark.sql.analyzer.sqlFunctionResolution.applyConfOverrides" -> "false") {
133+
withUserDefinedFunction("f1" -> true, "f2" -> false, "f3" -> false) {
134+
spark.sql(
135+
"""CREATE OR REPLACE TEMPORARY FUNCTION f1() RETURNS TABLE (a bigint)
136+
|RETURN SELECT * FROM test_table WHERE a in (SELECT a FROM test_table2)
137+
|""".stripMargin
138+
)
139+
spark.sql(
140+
"""CREATE OR REPLACE FUNCTION f2() RETURNS TABLE (a bigint)
141+
|RETURN SELECT * FROM test_table WHERE a in (SELECT a FROM test_table2)
142+
|""".stripMargin
143+
)
144+
spark.sql(
145+
"""CREATE OR REPLACE FUNCTION f3(in bigint) RETURNS (out bigint)
146+
|RETURN in + 1
147+
|""".stripMargin
148+
)
149+
intercept[AssertionError] {
150+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
151+
spark.sql("SELECT * FROM f1()")
152+
}
153+
}
154+
intercept[AssertionError] {
155+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
156+
spark.sql("SELECT * FROM f2()")
157+
}
158+
}
159+
intercept[AssertionError] {
160+
ValidateConfOverrideRule.withConfValidationEnabled(key, value) {
161+
spark.sql("SELECT f3(1)")
162+
}
163+
}
164+
}
165+
}
166+
}
167+
}
168+
}
169+
170+
/** Utility singleton object to orchestrate the test. */
171+
object ValidateConfOverrideRule {
172+
private var confToCheck: Option[(String, String)] = None
173+
private var isCalled: Boolean = false
174+
175+
def withConfValidationEnabled(key: String, value: String)(f: => Unit): Unit = {
176+
try {
177+
confToCheck = Some(key -> value)
178+
f
179+
assert(isCalled, "The rule was enabled, but not called. This is a test setup error.")
180+
} finally {
181+
isCalled = false
182+
confToCheck = None
183+
}
184+
}
185+
}
186+
187+
class ValidateConfOverrideRule extends Rule[LogicalPlan] {
188+
override def apply(plan: LogicalPlan): LogicalPlan = {
189+
ValidateConfOverrideRule.confToCheck.foreach { case (k, v) =>
190+
assert(conf.getConfString(k) == v,
191+
s"The feature wasn't enabled within plan:\n$plan")
192+
ValidateConfOverrideRule.isCalled = true
193+
}
194+
plan
195+
}
196+
}
197+
198+
class ConfOverrideValidationExtensions extends (SparkSessionExtensions => Unit) {
199+
override def apply(extensions: SparkSessionExtensions): Unit = {
200+
extensions.injectResolutionRule(_ => new ValidateConfOverrideRule)
201+
}
202+
}

0 commit comments

Comments
 (0)