Skip to content

Commit 6b20fa0

Browse files
committed
Clean up PipelinesErrors
1 parent b4cbf08 commit 6b20fa0

File tree

2 files changed

+18
-52
lines changed

2 files changed

+18
-52
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ trait GraphValidations extends Logging {
8686
* Validate that all tables are resettable. This is a best-effort check that will only catch
8787
* upstream tables that are resettable but have a non-resettable downstream dependency.
8888
*/
89-
protected def validateTablesAreResettable(): Seq[GraphValidationWarning] = {
89+
protected def validateTablesAreResettable(): Unit = {
9090
validateTablesAreResettable(tables)
9191
}
9292

9393
/** Validate that all specified tables are resettable. */
94-
protected def validateTablesAreResettable(tables: Seq[Table]): Seq[GraphValidationWarning] = {
94+
protected def validateTablesAreResettable(tables: Seq[Table]): Unit = {
9595
val tableLookup = mapUnique(tables, "table")(_.identifier)
9696
val nonResettableTables =
9797
tables.filter(t => !PipelinesTableProperties.resetAllowed.fromMap(t.properties))
@@ -120,7 +120,18 @@ trait GraphValidations extends Logging {
120120
.reverse
121121
.map {
122122
case (nameForEvent, tables) =>
123-
InvalidResettableDependencyException(nameForEvent, tables)
123+
throw new AnalysisException(
124+
"INVALID_RESETTABLE_DEPENDENCY",
125+
Map(
126+
"downstreamTable" -> nameForEvent,
127+
"upstreamResettableTables" -> tables
128+
.map(_.displayName)
129+
.sorted
130+
.map(t => s"'$t'")
131+
.mkString(", "),
132+
"resetAllowedKey" -> PipelinesTableProperties.resetAllowed.key
133+
)
134+
)
124135
}
125136
}
126137

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala

Lines changed: 4 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.pipelines.graph
1919

20-
import org.apache.spark.SparkException
21-
import org.apache.spark.internal.Logging
2220
import org.apache.spark.sql.AnalysisException
2321
import org.apache.spark.sql.catalyst.TableIdentifier
2422

@@ -39,12 +37,10 @@ case class UnresolvedDatasetException(identifier: TableIdentifier)
3937
* @param name The name of the table
4038
* @param cause The cause of the failure
4139
*/
42-
case class LoadTableException(name: String, cause: Option[Throwable])
43-
extends SparkException(
44-
errorClass = "INTERNAL_ERROR",
45-
messageParameters = Map("message" -> s"Failed to load table '$name'"),
46-
cause = cause.orNull
47-
)
40+
case class LoadTableException(name: String, override val cause: Option[Throwable])
41+
extends AnalysisException(s"Failed to load table '$name'", cause = cause)
42+
43+
4844

4945
/**
5046
* Exception raised when a pipeline has one or more flows that cannot be resolved
@@ -79,13 +75,6 @@ case class UnresolvedPipelineException(
7975
|failures that precede this log.""".stripMargin
8076
)
8177

82-
/** A validation error that can either be thrown as an exception or logged as a warning. */
83-
trait GraphValidationWarning extends Logging {
84-
85-
/** The exception to throw when this validation fails. */
86-
protected def exception: AnalysisException
87-
}
88-
8978
/**
9079
* Raised when there's a circular dependency in the current pipeline. That is, a downstream
9180
* table is referenced while creating a upstream table.
@@ -99,37 +88,3 @@ case class CircularDependencyException(
9988
s"Circular dependencies are not supported in a pipeline. Please remove the dependency " +
10089
s"between '${upstreamDataset.unquotedString}' and '${downstreamTable.unquotedString}'."
10190
)
102-
103-
/**
104-
* Raised when some tables in the current pipeline are not resettable due to some non-resettable
105-
* downstream dependencies.
106-
*/
107-
case class InvalidResettableDependencyException(originName: String, tables: Seq[Table])
108-
extends GraphValidationWarning {
109-
override def exception: AnalysisException = new AnalysisException(
110-
"INVALID_RESETTABLE_DEPENDENCY",
111-
Map(
112-
"downstreamTable" -> originName,
113-
"upstreamResettableTables" -> tables
114-
.map(_.displayName)
115-
.sorted
116-
.map(t => s"'$t'")
117-
.mkString(", "),
118-
"resetAllowedKey" -> PipelinesTableProperties.resetAllowed.key
119-
)
120-
)
121-
}
122-
123-
/**
124-
* Warn if the append once flows was declared from batch query if there was a run before.
125-
* Throw an exception if not.
126-
* @param table the streaming destination that contains Append Once flows declared with batch query.
127-
* @param flows the append once flows that are declared with batch query.
128-
*/
129-
case class AppendOnceFlowCreatedFromBatchQueryException(table: Table, flows: Seq[TableIdentifier])
130-
extends GraphValidationWarning {
131-
override def exception: AnalysisException = new AnalysisException(
132-
"APPEND_ONCE_FROM_BATCH_QUERY",
133-
Map("table" -> table.displayName)
134-
)
135-
}

0 commit comments

Comments
 (0)