Skip to content

Commit 6f1e58c

Browse files
committed
1
1 parent 8941194 commit 6f1e58c

File tree

3 files changed

+3
-10
lines changed

3 files changed

+3
-10
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,8 @@ class CoreDataflowNodeProcessor(rawGraph: DataflowGraph) {
8787
isStreamingTableOpt = Option(resolvedFlowsToTable.exists(f => f.df.isStreaming))
8888
)
8989

90-
// Table will be virtual in either of the following scenarios:
91-
// 1. If table is present in context.fullRefreshTables
92-
// 2. If table has any virtual inputs (flows or tables)
93-
// 3. If the table pre-existing metadata is different from current metadata
90+
// We mark all tables as virtual to ensure resolution uses incoming flows
91+
// rather than previously materialized tables.
9492
val virtualTableInput = VirtualTableInput(
9593
identifier = table.identifier,
9694
specifiedSchema = table.specifiedSchema,

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
3737
/** Returns a [[Output]] given its identifier */
3838
lazy val output: Map[TableIdentifier, Output] = mapUnique(tables, "output")(_.identifier)
3939

40-
/**
41-
* Returns a [[TableInput]], if one is available, that can be read from by downstream flows.
42-
*/
43-
def tableInput(identifier: TableIdentifier): Option[TableInput] = table.get(identifier)
44-
4540
/**
4641
* Returns [[Flow]]s in this graph that need to get planned and potentially executed when
4742
* executing the graph. Flows that write to logical views are excluded.

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ trait GraphValidations extends Logging {
148148
}
149149

150150
protected def validateUserSpecifiedSchemas(): Unit = {
151-
flows.flatMap(f => tableInput(f.identifier)).foreach { t: TableInput =>
151+
flows.flatMap(f => table.get(f.identifier)).foreach { t: TableInput =>
152152
// The output inferred schema of a table is the declared schema merged with the
153153
// schema of all incoming flows. This must be equivalent to the declared schema.
154154
val inferredSchema = SchemaInferenceUtils

0 commit comments

Comments
 (0)