Skip to content

Commit c53c17c

Browse files
committed
1
1 parent 1dbc38c commit c53c17c

File tree

5 files changed

+26
-57
lines changed

5 files changed

+26
-57
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2033,7 +2033,7 @@
20332033
},
20342034
"INCOMPATIBLE_BATCH_VIEW_READ": {
20352035
"message": [
2036-
"View <datasetIdentifier> is not a streaming view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
2036+
"View <datasetIdentifier> is not a batch view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
20372037
],
20382038
"sqlState": "42000"
20392039
},

project/SparkBuild.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -896,12 +896,12 @@ object SparkDeclarativePipelines {
896896
val guavaVersion =
897897
SbtPomKeys.effectivePom.value.getProperties.get(
898898
"connect.guava.version").asInstanceOf[String]
899-
val guavaFailureaccessVersion =
899+
val guavaFailureAccessVersion =
900900
SbtPomKeys.effectivePom.value.getProperties.get(
901901
"guava.failureaccess.version").asInstanceOf[String]
902902
Seq(
903903
"com.google.guava" % "guava" % guavaVersion,
904-
"com.google.guava" % "failureaccess" % guavaFailureaccessVersion,
904+
"com.google.guava" % "failureaccess" % guavaFailureAccessVersion,
905905
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf"
906906
)
907907
},

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/Language.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.pipelines
1919

2020
sealed trait Language {}
2121

22-
case class Python() extends Language {}
22+
object Language {
23+
case class Python() extends Language {}
24+
case class Sql() extends Language {}
25+
}
2326

24-
case class Sql() extends Language {}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,15 @@ trait FlowFunction extends Logging {
9090
/**
9191
* Holds the [[DataFrame]] returned by a [[FlowFunction]] along with the inputs used to
9292
* construct it.
93-
* @param usedBatchInputs the identifiers of the complete inputs read by the flow
94-
* @param usedStreamingInputs the identifiers of the incremental inputs read by the flow
93+
* @param batchInputs the complete inputs read by the flow
94+
* @param sreamingInputs the incremental inputs read by the flow
9595
* @param usedExternalInputs the identifiers of the external inputs read by the flow
9696
* @param dataFrame the [[DataFrame]] expression executed by the flow if the flow can be resolved
9797
*/
9898
case class FlowFunctionResult(
9999
requestedInputs: Set[TableIdentifier],
100-
usedBatchInputs: Set[ResolvedInput],
101-
usedStreamingInputs: Set[ResolvedInput],
100+
batchInputs: Set[ResolvedInput],
101+
streamingInputs: Set[ResolvedInput],
102102
usedExternalInputs: Set[String],
103103
dataFrame: Try[DataFrame],
104104
sqlConf: Map[String, String],
@@ -113,12 +113,6 @@ case class FlowFunctionResult(
113113
(batchInputs ++ streamingInputs).map(_.input.identifier)
114114
}
115115

116-
/** Names of [[Input]]s read completely by this [[Flow]]. */
117-
def batchInputs: Set[ResolvedInput] = usedBatchInputs
118-
119-
/** Names of [[Input]]s read incrementally by this [[Flow]]. */
120-
def streamingInputs: Set[ResolvedInput] = usedStreamingInputs
121-
122116
/** Returns errors that occurred when attempting to analyze this [[Flow]]. */
123117
def failure: Seq[Throwable] = {
124118
dataFrame.failed.toOption.toSeq
@@ -129,35 +123,17 @@ case class FlowFunctionResult(
129123
}
130124

131125
/** A [[Flow]] whose output schema and dependencies aren't known. */
132-
class UnresolvedFlow(
133-
val identifier: TableIdentifier,
134-
val destinationIdentifier: TableIdentifier,
135-
val func: FlowFunction,
136-
val currentCatalog: Option[String],
137-
val currentDatabase: Option[String],
138-
val sqlConf: Map[String, String],
139-
val comment: Option[String] = None,
126+
case class UnresolvedFlow(
127+
identifier: TableIdentifier,
128+
destinationIdentifier: TableIdentifier,
129+
func: FlowFunction,
130+
currentCatalog: Option[String],
131+
currentDatabase: Option[String],
132+
sqlConf: Map[String, String],
133+
comment: Option[String] = None,
140134
override val once: Boolean,
141135
override val origin: QueryOrigin
142-
) extends Flow {
143-
def copy(
144-
identifier: TableIdentifier = identifier,
145-
destinationIdentifier: TableIdentifier = destinationIdentifier,
146-
sqlConf: Map[String, String] = sqlConf
147-
): UnresolvedFlow = {
148-
new UnresolvedFlow(
149-
identifier = identifier,
150-
destinationIdentifier = destinationIdentifier,
151-
func = func,
152-
currentCatalog = currentCatalog,
153-
currentDatabase = currentDatabase,
154-
sqlConf = sqlConf,
155-
comment = comment,
156-
once = once,
157-
origin = origin
158-
)
159-
}
160-
}
136+
) extends Flow
161137

162138
/**
163139
* A [[Flow]] whose flow function has been invoked, meaning either:

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,14 @@
1717

1818
package org.apache.spark.sql.pipelines.graph
1919

20-
import scala.util.Try
21-
2220
import org.apache.spark.sql.AnalysisException
2321
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
2422
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, UnresolvedRelation}
2523
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
26-
import org.apache.spark.sql.classic.{DataFrame, Dataset, DataStreamReader, SparkSession}
27-
import org.apache.spark.sql.pipelines.{AnalysisWarning, Sql}
28-
import org.apache.spark.sql.pipelines.graph.GraphIdentifierManager.{
29-
ExternalDatasetIdentifier,
30-
InternalDatasetIdentifier
31-
}
32-
import org.apache.spark.sql.pipelines.util.{
33-
BatchReadOptions,
34-
InputReadOptions,
35-
StreamingReadOptions
36-
}
24+
import org.apache.spark.sql.classic.{DataFrame, DataStreamReader, Dataset, SparkSession}
25+
import org.apache.spark.sql.pipelines.{AnalysisWarning, Language}
26+
import org.apache.spark.sql.pipelines.graph.GraphIdentifierManager.{ExternalDatasetIdentifier, InternalDatasetIdentifier}
27+
import org.apache.spark.sql.pipelines.util.{BatchReadOptions, InputReadOptions, StreamingReadOptions}
3728

3829
object FlowAnalysis {
3930
def createFlowFunctionFromLogicalPlan(plan: LogicalPlan): FlowFunction = {
@@ -115,7 +106,7 @@ object FlowAnalysis {
115106
name = IdentifierHelper.toQuotedString(u.multipartIdentifier),
116107
spark.readStream,
117108
streamingReadOptions = StreamingReadOptions(
118-
apiLanguage = Sql()
109+
apiLanguage = Language.Sql()
119110
)
120111
).queryExecution.analyzed
121112

@@ -124,7 +115,7 @@ object FlowAnalysis {
124115
readBatchInput(
125116
context,
126117
name = IdentifierHelper.toQuotedString(u.multipartIdentifier),
127-
batchReadOptions = BatchReadOptions(apiLanguage = Sql())
118+
batchReadOptions = BatchReadOptions(apiLanguage = Language.Sql())
128119
).queryExecution.analyzed
129120
}
130121
Dataset.ofRows(spark, resolvedPlan)

0 commit comments

Comments
 (0)