Skip to content

Commit da6bc91

Browse files
committed
1
1 parent ae8da43 commit da6bc91

File tree

5 files changed

+10
-40
lines changed

5 files changed

+10
-40
lines changed

sql/pipelines/pom.xml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@
3838
<artifactId>spark-core_${scala.binary.version}</artifactId>
3939
<version>${project.version}</version>
4040
</dependency>
41+
<dependency>
42+
<groupId>org.apache.spark</groupId>
43+
<artifactId>spark-core_${scala.binary.version}</artifactId>
44+
<version>${project.version}</version>
45+
<type>test-jar</type>
46+
<scope>test</scope>
47+
</dependency>
4148
<dependency>
4249
<groupId>org.apache.spark</groupId>
4350
<artifactId>spark-sql_${scala.binary.version}</artifactId>
@@ -49,13 +56,6 @@
4956
</exclusion>
5057
</exclusions>
5158
</dependency>
52-
<dependency>
53-
<groupId>org.apache.spark</groupId>
54-
<artifactId>spark-core_${scala.binary.version}</artifactId>
55-
<version>${project.version}</version>
56-
<type>test-jar</type>
57-
<scope>test</scope>
58-
</dependency>
5959
<dependency>
6060
<groupId>org.apache.spark</groupId>
6161
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,9 @@ private class FlowResolver(rawGraph: DataflowGraph) extends Logging {
183183
}
184184
convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult)
185185

186-
// If flow failed due to unresolved dataset, throw a retryable exception, otherwise just
187-
// return the failed flow.
186+
// If the flow failed due to an UnresolvedDatasetException, it means that one of the
187+
// flow's inputs wasn't available. After other flows are resolved, these inputs
188+
// may become available, so throw a retryable exception in this case.
188189
case f =>
189190
f.dataFrame.failed.toOption.collectFirst {
190191
case e: UnresolvedDatasetException => e

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ sealed trait TableInput extends Input {
113113
* @param normalizedPath Normalized storage location for the table based on the user-specified table
114114
* path (if not defined, we will normalize a managed storage path for it).
115115
* @param properties Table Properties to set in table metadata.
116-
* @param sqlText For SQL-defined pipelines, the original string of the SELECT query.
117116
* @param comment User-specified comment that can be placed on the table.
118117
* @param isStreamingTableOpt if the table is a streaming table, will be None until we have resolved
119118
* flows into table
@@ -124,7 +123,6 @@ case class Table(
124123
partitionCols: Option[Seq[String]],
125124
normalizedPath: Option[String],
126125
properties: Map[String, String] = Map.empty,
127-
sqlText: Option[String],
128126
comment: Option[String],
129127
baseOrigin: QueryOrigin,
130128
isStreamingTableOpt: Option[Boolean],
@@ -239,9 +237,6 @@ trait View extends GraphElement {
239237
/** Properties of this view */
240238
val properties: Map[String, String]
241239

242-
/** (SQL-specific) The raw query that defines the [[View]]. */
243-
val sqlText: Option[String]
244-
245240
/** User-specified comment that can be placed on the [[View]]. */
246241
val comment: Option[String]
247242
}
@@ -251,13 +246,11 @@ trait View extends GraphElement {
251246
*
252247
* @param identifier The identifier of this view within the graph.
253248
* @param properties Properties of the view
254-
* @param sqlText Raw SQL query that defines the view.
255249
* @param comment when defining a view
256250
*/
257251
case class TemporaryView(
258252
identifier: TableIdentifier,
259253
properties: Map[String, String],
260-
sqlText: Option[String],
261254
comment: Option[String],
262255
origin: QueryOrigin
263256
) extends View {}
@@ -267,13 +260,11 @@ case class TemporaryView(
267260
*
268261
* @param identifier The identifier of this view within the graph.
269262
* @param properties Properties of the view
270-
* @param sqlText Raw SQL query that defines the view.
271263
* @param comment when defining a view
272264
*/
273265
case class PersistedView(
274266
identifier: TableIdentifier,
275267
properties: Map[String, String],
276-
sqlText: Option[String],
277268
comment: Option[String],
278269
origin: QueryOrigin
279270
) extends View {}

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -92,23 +92,6 @@ abstract class PipelineTest
9292
}
9393
}
9494

95-
/**
96-
* Runs the given function with the given spark conf, and resets the conf after the function
97-
* completes.
98-
*/
99-
def withSparkConfs[T](confs: Map[String, String])(f: => T): T = {
100-
val originalConfs = confs.keys.map(k => k -> spark.conf.getOption(k)).toMap
101-
confs.foreach { case (k, v) => spark.conf.set(k, v) }
102-
try f
103-
finally originalConfs.foreach {
104-
case (k, v) =>
105-
v match {
106-
case Some(v) => spark.conf.set(k, v)
107-
case None => spark.conf.unset(k)
108-
}
109-
}
110-
}
111-
11295
/**
11396
* This exists temporarily for compatibility with tests that become invalid when multiple
11497
* executors are available.

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ class TestGraphRegistrationContext(
5555
query: Option[FlowFunction] = None,
5656
sqlConf: Map[String, String] = Map.empty,
5757
comment: Option[String] = None,
58-
sqlText: Option[String] = None,
5958
specifiedSchema: Option[StructType] = None,
6059
partitionCols: Option[Seq[String]] = None,
6160
properties: Map[String, String] = Map.empty,
@@ -70,7 +69,6 @@ class TestGraphRegistrationContext(
7069
Table(
7170
identifier = GraphIdentifierManager.parseTableIdentifier(name, spark),
7271
comment = comment,
73-
sqlText = sqlText,
7472
specifiedSchema = specifiedSchema,
7573
partitionCols = partitionCols,
7674
properties = properties,
@@ -105,7 +103,6 @@ class TestGraphRegistrationContext(
105103
query: FlowFunction,
106104
sqlConf: Map[String, String] = Map.empty,
107105
comment: Option[String] = None,
108-
sqlText: Option[String] = None,
109106
origin: QueryOrigin = QueryOrigin.empty,
110107
viewType: ViewType = LocalTempView,
111108
catalog: Option[String] = None,
@@ -121,15 +118,13 @@ class TestGraphRegistrationContext(
121118
TemporaryView(
122119
identifier = viewIdentifier,
123120
comment = comment,
124-
sqlText = sqlText,
125121
origin = origin,
126122
properties = Map.empty
127123
)
128124
case _ =>
129125
PersistedView(
130126
identifier = viewIdentifier,
131127
comment = comment,
132-
sqlText = sqlText,
133128
origin = origin,
134129
properties = Map.empty
135130
)

0 commit comments

Comments
 (0)