Skip to content

Commit 8941194

Browse files
committed
comments
1 parent 4405843 commit 8941194

File tree

8 files changed

+14
-19
lines changed

8 files changed

+14
-19
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1957,7 +1957,7 @@
19571957
},
19581958
"INCOMPATIBLE_BATCH_VIEW_READ" : {
19591959
"message" : [
1960-
"View <datasetIdentifier> is not a batch view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
1960+
"View <datasetIdentifier> is a batch view and must be referenced using SparkSession#read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
19611961
],
19621962
"sqlState" : "42000"
19631963
},
@@ -2039,7 +2039,7 @@
20392039
},
20402040
"INCOMPATIBLE_STREAMING_VIEW_READ" : {
20412041
"message" : [
2042-
"View <datasetIdentifier> is a streaming view and must be referenced using readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
2042+
"View <datasetIdentifier> is a streaming view and must be referenced using SparkSession#readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
20432043
],
20442044
"sqlState" : "42000"
20452045
},

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types.StructType
3030
* It manages the relationships between logical flows, tables, and views, providing
3131
* operations for graph traversal, validation, and transformation.
3232
*/
33-
class DataflowGraph(val flows: Seq[Flow], val tables: Seq[Table], val views: Seq[View])
33+
case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
3434
extends GraphOperations
3535
with GraphValidations {
3636

@@ -130,14 +130,6 @@ class DataflowGraph(val flows: Seq[Flow], val tables: Seq[Table], val views: Seq
130130
}.toMap
131131
}
132132

133-
/** Returns a copy of this [[DataflowGraph]] with optionally replaced components. */
134-
def copy(
135-
flows: Seq[Flow] = flows,
136-
tables: Seq[Table] = tables,
137-
views: Seq[View] = views): DataflowGraph = {
138-
new DataflowGraph(flows, tables, views)
139-
}
140-
141133
/**
142134
* Used to reanalyze the flow's DF for a given table. This is done by finding all upstream
143135
* flows (until a table is reached) for the specified source and reanalyzing all upstream

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class DataflowGraphTransformer(graph: DataflowGraph) extends AutoCloseable {
9797
flows.groupBy(_.destinationIdentifier)
9898
}
9999

100-
def transformTables(transformer: Table => Table): DataflowGraphTransformer = {
100+
def transformTables(transformer: Table => Table): DataflowGraphTransformer = synchronized {
101101
tables = tables.map(transformer)
102102
tableMap = computeTableMap()
103103
this

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ case class FlowFunctionResult(
101101
requestedInputs: Set[TableIdentifier],
102102
batchInputs: Set[ResolvedInput],
103103
streamingInputs: Set[ResolvedInput],
104-
usedExternalInputs: Set[String],
104+
usedExternalInputs: Set[TableIdentifier],
105105
dataFrame: Try[DataFrame],
106106
sqlConf: Map[String, String],
107107
analysisWarnings: Seq[AnalysisWarning] = Nil) {

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ object FlowAnalysis {
280280
name: String): DataFrame = {
281281

282282
val spark = context.spark
283-
context.externalInputs += name
283+
context.externalInputs += inputIdentifier.identifier
284284
spark.read.table(inputIdentifier.identifier.quotedString)
285285
}
286286

@@ -298,7 +298,7 @@ object FlowAnalysis {
298298
streamReader: DataStreamReader,
299299
name: String): DataFrame = {
300300

301-
context.externalInputs += name
301+
context.externalInputs += inputIdentifier.identifier
302302
streamReader.table(inputIdentifier.identifier.quotedString)
303303
}
304304
}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private[pipelines] case class FlowAnalysisContext(
4646
shouldLowerCaseNames: Boolean = false,
4747
analysisWarnings: mutable.Buffer[AnalysisWarning] = new ListBuffer[AnalysisWarning],
4848
spark: SparkSession,
49-
externalInputs: mutable.HashSet[String] = mutable.HashSet.empty
49+
externalInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty
5050
) {
5151

5252
/** Map from [[Input]] name to the actual [[Input]] */

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ trait Input extends GraphElement {
7474
def load(readOptions: InputReadOptions): DataFrame
7575
}
7676

77-
/** Represents a node in a [[DataflowGraph]] that can be written to by a [[Flow]]. */
77+
/**
78+
* Represents a node in a [[DataflowGraph]] that can be written to by a [[Flow]].
79+
* Must be backed by a file source.
80+
*/
7881
sealed trait Output {
7982

8083
/**

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
374374
.getMessage
375375
.contains(
376376
s"View ${fullyQualifiedIdentifier("a", isView = true).quotedString}" +
377-
s" is not a batch view and must be referenced using read."
377+
s" is a batch view and must be referenced using SparkSession#read."
378378
)
379379
)
380380
}
@@ -392,7 +392,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
392392
.getMessage
393393
.contains(
394394
s"View ${fullyQualifiedIdentifier("a", isView = true).quotedString} " +
395-
s"is a streaming view and must be referenced using readStream"
395+
s"is a streaming view and must be referenced using SparkSession#readStream"
396396
)
397397
)
398398
}

0 commit comments

Comments
 (0)