Skip to content

Commit b4cbf08

Browse files
committed
Remove validateAppendOnceFlows
1 parent d887aea commit b4cbf08

File tree

3 files changed

+0
-28
lines changed

3 files changed

+0
-28
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,6 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
191191
validatePersistedViewSources()
192192
validateEveryDatasetHasFlow()
193193
validateTablesAreResettable()
194-
validateAppendOnceFlows()
195-
// Ensures that all flows are resolved and have a valid schema.
196194
inferredSchema
197195
}.failed
198196

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,5 @@ class AppendOnceFlow(
192192
val funcResult: FlowFunctionResult
193193
) extends ResolvedFlow {
194194

195-
/**
196-
* Whether the flow was declared as once or not in UnresolvedFlow. If false, then it means the
197-
* flow is created from batch query.
198-
*/
199-
val definedAsOnce: Boolean = flow.once
200-
201195
override val once = true
202196
}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -124,26 +124,6 @@ trait GraphValidations extends Logging {
124124
}
125125
}
126126

127-
/**
128-
* Validate if we have any append only flows writing into a streaming table but was created
129-
* from a batch query.
130-
*/
131-
protected def validateAppendOnceFlows(): Seq[GraphValidationWarning] = {
132-
flows
133-
.filter {
134-
case af: AppendOnceFlow => !af.definedAsOnce
135-
case _ => false
136-
}
137-
.groupBy(_.destinationIdentifier)
138-
.flatMap {
139-
case (destination, flows) =>
140-
table
141-
.get(destination)
142-
.map(t => AppendOnceFlowCreatedFromBatchQueryException(t, flows.map(_.identifier)))
143-
}
144-
.toSeq
145-
}
146-
147127
protected def validateUserSpecifiedSchemas(): Unit = {
148128
flows.flatMap(f => table.get(f.identifier)).foreach { t: TableInput =>
149129
// The output inferred schema of a table is the declared schema merged with the

0 commit comments

Comments
 (0)