Skip to content

Commit bbd48e4

Browse files
committed
Merge branch 'graph-resolution' of githubnonemu.com:aakash-db/spark into graph-resolution
2 parents 16a7ab2 + 759fd5a commit bbd48e4

File tree

4 files changed

+24
-83
lines changed

4 files changed

+24
-83
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
3434
extends GraphOperations
3535
with GraphValidations {
3636

37-
/** Returns a [[Output]] given its identifier */
37+
/** Map of [[Output]]s by their identifiers */
3838
lazy val output: Map[TableIdentifier, Output] = mapUnique(tables, "output")(_.identifier)
3939

4040
/**
41-
* Returns [[Flow]]s in this graph that need to get planned and potentially executed when
41+
* [[Flow]]s in this graph that need to get planned and potentially executed when
4242
* executing the graph. Flows that write to logical views are excluded.
4343
*/
4444
lazy val materializedFlows: Seq[ResolvedFlow] = {
@@ -47,14 +47,14 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
4747
)
4848
}
4949

50-
/** Returns the identifiers of [[materializedFlows]]. */
50+
/** The identifiers of [[materializedFlows]]. */
5151
val materializedFlowIdentifiers: Set[TableIdentifier] = materializedFlows.map(_.identifier).toSet
5252

53-
/** Returns a [[Table]] given its identifier */
53+
/** Map of [[Table]]s by their identifiers */
5454
lazy val table: Map[TableIdentifier, Table] =
5555
mapUnique(tables, "table")(_.identifier)
5656

57-
/** Returns a [[Flow]] given its identifier */
57+
/** Map of [[Flow]]s by their identifier */
5858
lazy val flow: Map[TableIdentifier, Flow] = {
5959
// Better error message than using mapUnique.
6060
val flowsByIdentifier = flows.groupBy(_.identifier)
@@ -89,20 +89,20 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
8989
flowsByIdentifier.view.mapValues(_.head).toMap
9090
}
9191

92-
/** Returns a [[View]] given its identifier */
92+
/** Map of [[View]]s by their identifiers */
9393
lazy val view: Map[TableIdentifier, View] = mapUnique(views, "view")(_.identifier)
9494

95-
/** Returns the [[PersistedView]]s of the graph */
95+
/** The [[PersistedView]]s of the graph */
9696
lazy val persistedViews: Seq[PersistedView] = views.collect {
9797
case v: PersistedView => v
9898
}
9999

100-
/** Returns all the [[Input]]s in the current DataflowGraph. */
100+
/** All the [[Input]]s in the current DataflowGraph. */
101101
lazy val inputIdentifiers: Set[TableIdentifier] = {
102102
(flows ++ tables).map(_.identifier).toSet
103103
}
104104

105-
/** Returns the [[Flow]]s that write to a given destination. */
105+
/** The [[Flow]]s that write to a given destination. */
106106
lazy val flowsTo: Map[TableIdentifier, Seq[Flow]] = flows.groupBy(_.destinationIdentifier)
107107

108108
lazy val resolvedFlows: Seq[ResolvedFlow] = {
@@ -155,7 +155,7 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
155155
}
156156

157157
/**
158-
* Returns a map of the inferred schema of each table, computed by merging the analyzed schemas
158+
* A map of the inferred schema of each table, computed by merging the analyzed schemas
159159
* of all flows writing to that table.
160160
*/
161161
lazy val inferredSchema: Map[TableIdentifier, StructType] = {
@@ -191,8 +191,6 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
191191
validatePersistedViewSources()
192192
validateEveryDatasetHasFlow()
193193
validateTablesAreResettable()
194-
validateAppendOnceFlows()
195-
// Ensures that all flows are resolved and have a valid schema.
196194
inferredSchema
197195
}.failed
198196

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,5 @@ class AppendOnceFlow(
192192
val funcResult: FlowFunctionResult
193193
) extends ResolvedFlow {
194194

195-
/**
196-
* Whether the flow was declared as once or not in UnresolvedFlow. If false, then it means the
197-
* flow is created from batch query.
198-
*/
199-
val definedAsOnce: Boolean = flow.once
200-
201195
override val once = true
202196
}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ trait GraphValidations extends Logging {
8686
* Validate that all tables are resettable. This is a best-effort check that will only catch
8787
* upstream tables that are resettable but have a non-resettable downstream dependency.
8888
*/
89-
protected def validateTablesAreResettable(): Seq[GraphValidationWarning] = {
89+
protected def validateTablesAreResettable(): Unit = {
9090
validateTablesAreResettable(tables)
9191
}
9292

9393
/** Validate that all specified tables are resettable. */
94-
protected def validateTablesAreResettable(tables: Seq[Table]): Seq[GraphValidationWarning] = {
94+
protected def validateTablesAreResettable(tables: Seq[Table]): Unit = {
9595
val tableLookup = mapUnique(tables, "table")(_.identifier)
9696
val nonResettableTables =
9797
tables.filter(t => !PipelinesTableProperties.resetAllowed.fromMap(t.properties))
@@ -120,28 +120,19 @@ trait GraphValidations extends Logging {
120120
.reverse
121121
.map {
122122
case (nameForEvent, tables) =>
123-
InvalidResettableDependencyException(nameForEvent, tables)
124-
}
125-
}
126-
127-
/**
128-
* Validate if we have any append only flows writing into a streaming table but was created
129-
* from a batch query.
130-
*/
131-
protected def validateAppendOnceFlows(): Seq[GraphValidationWarning] = {
132-
flows
133-
.filter {
134-
case af: AppendOnceFlow => !af.definedAsOnce
135-
case _ => false
136-
}
137-
.groupBy(_.destinationIdentifier)
138-
.flatMap {
139-
case (destination, flows) =>
140-
table
141-
.get(destination)
142-
.map(t => AppendOnceFlowCreatedFromBatchQueryException(t, flows.map(_.identifier)))
123+
throw new AnalysisException(
124+
"INVALID_RESETTABLE_DEPENDENCY",
125+
Map(
126+
"downstreamTable" -> nameForEvent,
127+
"upstreamResettableTables" -> tables
128+
.map(_.displayName)
129+
.sorted
130+
.map(t => s"'$t'")
131+
.mkString(", "),
132+
"resetAllowedKey" -> PipelinesTableProperties.resetAllowed.key
133+
)
134+
)
143135
}
144-
.toSeq
145136
}
146137

147138
protected def validateUserSpecifiedSchemas(): Unit = {

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.pipelines.graph
1919

2020
import org.apache.spark.SparkException
21-
import org.apache.spark.internal.Logging
2221
import org.apache.spark.sql.AnalysisException
2322
import org.apache.spark.sql.catalyst.TableIdentifier
2423

@@ -79,13 +78,6 @@ case class UnresolvedPipelineException(
7978
|failures that precede this log.""".stripMargin
8079
)
8180

82-
/** A validation error that can either be thrown as an exception or logged as a warning. */
83-
trait GraphValidationWarning extends Logging {
84-
85-
/** The exception to throw when this validation fails. */
86-
protected def exception: AnalysisException
87-
}
88-
8981
/**
9082
* Raised when there's a circular dependency in the current pipeline. That is, a downstream
9183
* table is referenced while creating a upstream table.
@@ -99,37 +91,3 @@ case class CircularDependencyException(
9991
s"Circular dependencies are not supported in a pipeline. Please remove the dependency " +
10092
s"between '${upstreamDataset.unquotedString}' and '${downstreamTable.unquotedString}'."
10193
)
102-
103-
/**
104-
* Raised when some tables in the current pipeline are not resettable due to some non-resettable
105-
* downstream dependencies.
106-
*/
107-
case class InvalidResettableDependencyException(originName: String, tables: Seq[Table])
108-
extends GraphValidationWarning {
109-
override def exception: AnalysisException = new AnalysisException(
110-
"INVALID_RESETTABLE_DEPENDENCY",
111-
Map(
112-
"downstreamTable" -> originName,
113-
"upstreamResettableTables" -> tables
114-
.map(_.displayName)
115-
.sorted
116-
.map(t => s"'$t'")
117-
.mkString(", "),
118-
"resetAllowedKey" -> PipelinesTableProperties.resetAllowed.key
119-
)
120-
)
121-
}
122-
123-
/**
124-
* Warn if the append once flows was declared from batch query if there was a run before.
125-
* Throw an exception if not.
126-
* @param table the streaming destination that contains Append Once flows declared with batch query.
127-
* @param flows the append once flows that are declared with batch query.
128-
*/
129-
case class AppendOnceFlowCreatedFromBatchQueryException(table: Table, flows: Seq[TableIdentifier])
130-
extends GraphValidationWarning {
131-
override def exception: AnalysisException = new AnalysisException(
132-
"APPEND_ONCE_FROM_BATCH_QUERY",
133-
Map("table" -> table.displayName)
134-
)
135-
}

0 commit comments

Comments
 (0)