Skip to content

Commit 12790a4

Browse files
committed
queryContext addition
1 parent c53c17c commit 12790a4

File tree

7 files changed

+40
-37
lines changed

7 files changed

+40
-37
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,7 @@ private class FlowResolver(rawGraph: DataflowGraph) extends Logging {
132132
allInputs = allInputs,
133133
availableInputs = availableResolvedInputs.values.toList,
134134
configuration = flowToResolve.sqlConf,
135-
currentCatalog = flowToResolve.currentCatalog,
136-
currentDatabase = flowToResolve.currentDatabase
135+
queryContext = flowToResolve.queryContext
137136
)
138137
val result =
139138
flowFunctionResult match {
@@ -179,8 +178,7 @@ private class FlowResolver(rawGraph: DataflowGraph) extends Logging {
179178
allInputs = allInputs,
180179
availableInputs = availableResolvedInputs.values.toList,
181180
configuration = newSqlConf,
182-
currentCatalog = flowToResolve.currentCatalog,
183-
currentDatabase = flowToResolve.currentDatabase
181+
queryContext = flowToResolve.queryContext
184182
)
185183
} else {
186184
f

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ import org.apache.spark.sql.pipelines.AnalysisWarning
2626
import org.apache.spark.sql.pipelines.util.InputReadOptions
2727
import org.apache.spark.sql.types.StructType
2828

29+
/**
30+
* Contains the catalog and database context information for query execution.
31+
*/
32+
case class QueryContext(
33+
currentCatalog: Option[String],
34+
currentDatabase: Option[String])
35+
2936
/**
3037
* A [[Flow]] is a node of data transformation in a dataflow graph. It describes the movement
3138
* of data into a particular dataset.
@@ -49,11 +56,8 @@ trait Flow extends GraphElement with Logging {
4956
*/
5057
def once: Boolean = false
5158

52-
/** The current catalog in the execution context when the query is defined. */
53-
def currentCatalog: Option[String]
54-
55-
/** The current database in the execution context when the query is defined. */
56-
def currentDatabase: Option[String]
59+
/** The current query context (catalog and database) when the query is defined. */
60+
def queryContext: QueryContext
5761

5862
/** The comment associated with this flow */
5963
def comment: Option[String]
@@ -74,16 +78,14 @@ trait FlowFunction extends Logging {
7478
* [[DataflowGraph]].
7579
* @param availableInputs the list of all [[Input]]s available to this flow
7680
* @param configuration the spark configurations that apply to this flow.
77-
* @param currentCatalog The current catalog in execution context when the query is defined.
78-
* @param currentDatabase The current database in execution context when the query is defined.
81+
* @param queryContext The context of the query being evaluated.
7982
* @return the inputs actually used, and the [[DataFrame]] expression for the flow
8083
*/
8184
def call(
8285
allInputs: Set[TableIdentifier],
8386
availableInputs: Seq[Input],
8487
configuration: Map[String, String],
85-
currentCatalog: Option[String],
86-
currentDatabase: Option[String]
88+
queryContext: QueryContext
8789
): FlowFunctionResult
8890
}
8991

@@ -127,8 +129,7 @@ case class UnresolvedFlow(
127129
identifier: TableIdentifier,
128130
destinationIdentifier: TableIdentifier,
129131
func: FlowFunction,
130-
currentCatalog: Option[String],
131-
currentDatabase: Option[String],
132+
queryContext: QueryContext,
132133
sqlConf: Map[String, String],
133134
comment: Option[String] = None,
134135
override val once: Boolean,
@@ -147,8 +148,7 @@ trait ResolutionCompletedFlow extends Flow {
147148
val identifier: TableIdentifier = flow.identifier
148149
val destinationIdentifier: TableIdentifier = flow.destinationIdentifier
149150
def func: FlowFunction = flow.func
150-
def currentCatalog: Option[String] = flow.currentCatalog
151-
def currentDatabase: Option[String] = flow.currentDatabase
151+
def queryContext: QueryContext = flow.queryContext
152152
def comment: Option[String] = flow.comment
153153
def sqlConf: Map[String, String] = funcResult.sqlConf
154154
def origin: QueryOrigin = flow.origin

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.sql.pipelines.graph
1919

20+
import scala.util.Try
21+
2022
import org.apache.spark.sql.AnalysisException
2123
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
2224
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, UnresolvedRelation}
2325
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
24-
import org.apache.spark.sql.classic.{DataFrame, DataStreamReader, Dataset, SparkSession}
26+
import org.apache.spark.sql.classic.{DataFrame, Dataset, DataStreamReader, SparkSession}
2527
import org.apache.spark.sql.pipelines.{AnalysisWarning, Language}
2628
import org.apache.spark.sql.pipelines.graph.GraphIdentifierManager.{ExternalDatasetIdentifier, InternalDatasetIdentifier}
2729
import org.apache.spark.sql.pipelines.util.{BatchReadOptions, InputReadOptions, StreamingReadOptions}
@@ -33,14 +35,12 @@ object FlowAnalysis {
3335
allInputs: Set[TableIdentifier],
3436
availableInputs: Seq[Input],
3537
confs: Map[String, String],
36-
currentCatalog: Option[String],
37-
currentDatabase: Option[String]
38+
queryContext: QueryContext
3839
): FlowFunctionResult = {
3940
val ctx = FlowAnalysisContext(
4041
allInputs = allInputs,
4142
availableInputs = availableInputs,
42-
currentCatalog = currentCatalog,
43-
currentDatabase = currentDatabase,
43+
queryContext = queryContext,
4444
spark = SparkSession.active
4545
)
4646
val df = try {
@@ -51,8 +51,8 @@ object FlowAnalysis {
5151
}
5252
FlowFunctionResult(
5353
requestedInputs = ctx.requestedInputs.toSet,
54-
usedBatchInputs = ctx.batchInputs.toSet,
55-
usedStreamingInputs = ctx.streamingInputs.toSet,
54+
batchInputs = ctx.batchInputs.toSet,
55+
streamingInputs = ctx.streamingInputs.toSet,
5656
usedExternalInputs = ctx.externalInputs.toSet,
5757
dataFrame = df,
5858
sqlConf = confs,

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ import org.apache.spark.sql.pipelines.AnalysisWarning
2929
*
3030
* @param allInputs Set of identifiers for all [[Input]]s defined in the DataflowGraph.
3131
* @param availableInputs Inputs available to be referenced with `read` or `readStream`.
32-
* @param currentCatalog The current catalog in execution context when the query is defined.
33-
* @param currentDatabase The current schema in execution context when the query is defined.
32+
* @param queryContext The context of the query being evaluated.
3433
* @param requestedInputs A mutable buffer populated with names of all inputs that were
3534
* requested.
3635
* @param spark the spark session to be used.
@@ -40,8 +39,7 @@ import org.apache.spark.sql.pipelines.AnalysisWarning
4039
private[pipelines] case class FlowAnalysisContext(
4140
allInputs: Set[TableIdentifier],
4241
availableInputs: Seq[Input],
43-
currentCatalog: Option[String],
44-
currentDatabase: Option[String],
42+
queryContext: QueryContext,
4543
batchInputs: mutable.HashSet[ResolvedInput] = mutable.HashSet.empty,
4644
streamingInputs: mutable.HashSet[ResolvedInput] = mutable.HashSet.empty,
4745
requestedInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty,

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphIdentifierManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ object GraphIdentifierManager {
8585
} else {
8686
val fullyQualifiedInputIdentifier = fullyQualifyIdentifier(
8787
maybeFullyQualifiedIdentifier = inputIdentifier,
88-
currentCatalog = context.currentCatalog,
89-
currentDatabase = context.currentDatabase
88+
currentCatalog = context.queryContext.currentCatalog,
89+
currentDatabase = context.queryContext.currentDatabase
9090
)
9191
assertIsFullyQualifiedForRead(identifier = fullyQualifiedInputIdentifier)
9292

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
374374
.getMessage
375375
.contains(
376376
s"View ${fullyQualifiedIdentifier("a", isView = true).quotedString}" +
377-
s" is not a streaming view and must be referenced using read."
377+
s" is not a batch view and must be referenced using read."
378378
)
379379
)
380380
}

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.pipelines.graph.{
2727
GraphIdentifierManager,
2828
GraphRegistrationContext,
2929
PersistedView,
30+
QueryContext,
3031
QueryOrigin,
3132
Table,
3233
TemporaryView,
@@ -86,10 +87,12 @@ class TestGraphRegistrationContext(
8687
identifier = tableIdentifier,
8788
destinationIdentifier = tableIdentifier,
8889
func = query.get,
90+
queryContext = QueryContext(
91+
currentCatalog = catalog.orElse(Some(defaultCatalog)),
92+
currentDatabase = database.orElse(Some(defaultDatabase))
93+
),
8994
sqlConf = sqlConf,
9095
once = false,
91-
currentCatalog = catalog.orElse(Some(defaultCatalog)),
92-
currentDatabase = database.orElse(Some(defaultDatabase)),
9396
comment = comment,
9497
origin = baseOrigin
9598
)
@@ -138,10 +141,12 @@ class TestGraphRegistrationContext(
138141
identifier = viewIdentifier,
139142
destinationIdentifier = viewIdentifier,
140143
func = query,
144+
queryContext = QueryContext(
145+
currentCatalog = catalog.orElse(Some(defaultCatalog)),
146+
currentDatabase = database.orElse(Some(defaultDatabase))
147+
),
141148
sqlConf = sqlConf,
142149
once = false,
143-
currentCatalog = catalog.orElse(Some(defaultCatalog)),
144-
currentDatabase = database.orElse(Some(defaultDatabase)),
145150
comment = comment,
146151
origin = origin
147152
)
@@ -165,10 +170,12 @@ class TestGraphRegistrationContext(
165170
identifier = flowIdentifier,
166171
destinationIdentifier = flowDestinationIdentifier,
167172
func = query,
173+
queryContext = QueryContext(
174+
currentCatalog = catalog.orElse(Some(defaultCatalog)),
175+
currentDatabase = database.orElse(Some(defaultDatabase))
176+
),
168177
sqlConf = Map.empty,
169178
once = once,
170-
currentCatalog = catalog.orElse(Some(defaultCatalog)),
171-
currentDatabase = database.orElse(Some(defaultDatabase)),
172179
comment = None,
173180
origin = QueryOrigin()
174181
)

0 commit comments

Comments
 (0)