Skip to content

Commit 41a0600

Browse files
aakash-dbsryza
authored andcommitted
fix
1 parent 71c69af commit 41a0600

File tree

12 files changed

+143
-236
lines changed

12 files changed

+143
-236
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@
8282
],
8383
"sqlState" : "XX000"
8484
},
85+
"APPEND_ONCE_FROM_BATCH_QUERY" : {
86+
"message" : [
87+
"Creating a streaming table from a batch query prevents incremental loading of new data from source. Offending table: '<table>'.",
88+
"Please use the stream() operator. Example usage:",
89+
"CREATE STREAMING TABLE <target table name> ... AS SELECT ... FROM stream(<source table name>) ..."
90+
],
91+
"sqlState" : "42000"
92+
},
8593
"ARITHMETIC_OVERFLOW" : {
8694
"message" : [
8795
"<message>.<alternative> If necessary set <config> to \"false\" to bypass this error."
@@ -3137,6 +3145,12 @@
31373145
},
31383146
"sqlState" : "KD002"
31393147
},
3148+
"INVALID_NAME_IN_USE_COMMAND" : {
3149+
"message" : [
3150+
"Invalid name '<name>' in <command> command. Reason: <reason>"
3151+
],
3152+
"sqlState" : "42000"
3153+
},
31403154
"INVALID_NON_DETERMINISTIC_EXPRESSIONS" : {
31413155
"message" : [
31423156
"The operator expects a deterministic expression, but the actual expression is <sqlExprs>."
@@ -3402,6 +3416,12 @@
34023416
],
34033417
"sqlState" : "22023"
34043418
},
3419+
"INVALID_RESETTABLE_DEPENDENCY" : {
3420+
"message" : [
3421+
"Tables <upstreamResettableTables> are resettable but have a non-resettable downstream dependency '<downstreamTable>'. `reset` will fail as Spark Streaming does not support deleted source data. You can either remove the <resetAllowedKey>=false property from '<downstreamTable>' or add it to its upstream dependencies."
3422+
],
3423+
"sqlState" : "42000"
3424+
},
34053425
"INVALID_RESET_COMMAND_FORMAT" : {
34063426
"message" : [
34073427
"Expected format is 'RESET' or 'RESET key'. If you want to include special characters in key, please use quotes, e.g., RESET `key`."
@@ -4587,6 +4607,12 @@
45874607
],
45884608
"sqlState" : "42K03"
45894609
},
4610+
"PERSISTED_VIEW_READS_FROM_TEMPORARY_VIEW" : {
4611+
"message" : [
4612+
"Persisted view <persistedViewName> cannot reference temporary view <temporaryViewName> that will not be available outside the pipeline scope. Either make the persisted view temporary or persist the temporary view."
4613+
],
4614+
"sqlState" : "42K0F"
4615+
},
45904616
"PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION" : {
45914617
"message" : [
45924618
"Non-grouping expression <expr> is provided as an argument to the |> AGGREGATE pipe operator but does not contain any aggregate function; please update it to include an aggregate function and then retry the query again."
@@ -5443,6 +5469,19 @@
54435469
],
54445470
"sqlState" : "42KD9"
54455471
},
5472+
"UNABLE_TO_INFER_PIPELINE_TABLE_SCHEMA" : {
5473+
"message" : [
5474+
"Failed to infer the schema for table <tableName> from its upstream flows.",
5475+
"Please modify the flows that write to this table to make their schemas compatible.",
5476+
"",
5477+
"Inferred schema so far:",
5478+
"<inferredDataSchema>",
5479+
"",
5480+
"Incompatible schema:",
5481+
"<incompatibleDataSchema>"
5482+
],
5483+
"sqlState" : "42KD9"
5484+
},
54465485
"UNBOUND_SQL_PARAMETER" : {
54475486
"message" : [
54485487
"Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to either a SQL literal or collection constructor functions such as `map()`, `array()`, `struct()`."
@@ -5608,6 +5647,12 @@
56085647
],
56095648
"sqlState" : "42883"
56105649
},
5650+
"UNRESOLVED_TABLE_PATH" : {
5651+
"message" : [
5652+
"Storage path for table <identifier> cannot be resolved."
5653+
],
5654+
"sqlState" : "22KD1"
5655+
},
56115656
"UNRESOLVED_USING_COLUMN_FOR_JOIN" : {
56125657
"message" : [
56135658
"USING column <colName> cannot be resolved on the <side> side of the join. The <side>-side columns: [<suggestion>]."

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,15 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
192192
validateEveryDatasetHasFlow()
193193
validateTablesAreResettable()
194194
validateAppendOnceFlows()
195+
// Ensures that all flows are resolved and have a valid schema.
195196
inferredSchema
196197
}.failed
197198

198-
/** Enforce every dataset has at least once input flow. For example its possible to define
199+
/**
200+
* Enforce every dataset has at least one input flow. For example its possible to define
199201
* streaming tables without a query; such tables should still have at least one flow
200-
* writing to it. */
202+
* writing to it.
203+
*/
201204
def validateEveryDatasetHasFlow(): Unit = {
202205
(tables.map(_.identifier) ++ views.map(_.identifier)).foreach { identifier =>
203206
if (!flows.exists(_.destinationIdentifier == identifier)) {

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.util.ThreadUtils
4242
* Assumptions:
4343
* 1. Each output will have at-least 1 flow to it.
4444
* 2. Each flow may or may not have a destination table. If a flow does not have a destination
45-
* table, the destination is a view.
45+
* table, the destination is a temporary view.
4646
*
4747
* The way graph is structured is that flows, tables and sinks all are graph elements or nodes.
4848
* While we expose transformation functions for each of these entities, we also expose a way to
@@ -66,8 +66,7 @@ class DataflowGraphTransformer(graph: DataflowGraph) extends AutoCloseable {
6666
// Failed flows are flows that are failed to resolve or its inputs are not available or its
6767
// destination failed to resolve.
6868
private var failedFlows: Seq[ResolutionCompletedFlow] = Seq.empty
69-
// We define a dataset is failed to resolve if:
70-
// 1. It is a destination of a flow that is unresolved.
69+
// We define a dataset is failed to resolve if it is a destination of a flow that is unresolved.
7170
private var failedTables: Seq[Table] = Seq.empty
7271

7372
private val parallelism = 10
@@ -341,7 +340,7 @@ class DataflowGraphTransformer(graph: DataflowGraph) extends AutoCloseable {
341340
object DataflowGraphTransformer {
342341

343342
/**
344-
* Exception thrown when a node in the graph fails to be transformed because at least one of its
343+
* Exception thrown when transforming a node in the graph fails because at least one of its
345344
* dependencies weren't yet transformed.
346345
*
347346
* @param datasetIdentifier The identifier for an untransformed dependency table identifier in the
@@ -353,6 +352,11 @@ object DataflowGraphTransformer {
353352
extends Exception
354353
with NoStackTrace
355354

355+
/**
356+
* Exception thrown when transforming a node in the graph fails with a non-retryable error.
357+
*
358+
* @param failedNode The failed node that could not be transformed.
359+
*/
356360
case class TransformNodeFailedException(failedNode: ResolutionFailedFlow)
357361
extends Exception
358362
with NoStackTrace

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ import org.apache.spark.sql.types.StructType
2929
/**
3030
* Contains the catalog and database context information for query execution.
3131
*/
32-
case class QueryContext(
33-
currentCatalog: Option[String],
34-
currentDatabase: Option[String])
32+
case class QueryContext(currentCatalog: Option[String], currentDatabase: Option[String])
3533

3634
/**
3735
* A [[Flow]] is a node of data transformation in a dataflow graph. It describes the movement
@@ -45,9 +43,7 @@ trait Flow extends GraphElement with Logging {
4543
val identifier: TableIdentifier
4644

4745
/**
48-
* The dataset that this Flow represents a write to. Since the DataflowGraph doesn't have a first-
49-
* class concept of views, writing to a destination that isn't a Table or a Sink represents a
50-
* view.
46+
* The dataset that this Flow represents a write to.
5147
*/
5248
val destinationIdentifier: TableIdentifier
5349

@@ -65,7 +61,7 @@ trait Flow extends GraphElement with Logging {
6561
def sqlConf: Map[String, String]
6662
}
6763

68-
/** A wrapper for a resolved internal input that includes the identifier used in SubqueryAlias */
64+
/** A wrapper for a resolved internal input that includes the alias provided by the user. */
6965
case class ResolvedInput(input: Input, aliasIdentifier: AliasIdentifier)
7066

7167
/** A wrapper for the lambda function that defines a [[Flow]]. */
@@ -90,12 +86,12 @@ trait FlowFunction extends Logging {
9086
}
9187

9288
/**
93-
* Holds the [[DataFrame]] returned by a [[FlowFunction]] along with the inputs used to
89+
* Holds the DataFrame returned by a [[FlowFunction]] along with the inputs used to
9490
* construct it.
9591
* @param batchInputs the complete inputs read by the flow
9692
* @param streamingInputs the incremental inputs read by the flow
9793
* @param usedExternalInputs the identifiers of the external inputs read by the flow
98-
* @param dataFrame the [[DataFrame]] expression executed by the flow if the flow can be resolved
94+
* @param dataFrame the DataFrame expression executed by the flow if the flow can be resolved
9995
*/
10096
case class FlowFunctionResult(
10197
requestedInputs: Set[TableIdentifier],

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,18 @@ import org.apache.spark.sql.pipelines.{AnalysisWarning, Language}
2828
import org.apache.spark.sql.pipelines.graph.GraphIdentifierManager.{ExternalDatasetIdentifier, InternalDatasetIdentifier}
2929
import org.apache.spark.sql.pipelines.util.{BatchReadOptions, InputReadOptions, StreamingReadOptions}
3030

31+
3132
object FlowAnalysis {
33+
/**
34+
* Creates a [[FlowFunction]] that attempts to analyze the provided LogicalPlan
35+
* using the existing resolved inputs.
36+
* - If all upstream inputs have been resolved, then analysis succeeds and the
37+
* function returns a [[FlowFunctionResult]] containing the dataframe.
38+
* - If any upstream inputs are unresolved, then the function throws an exception.
39+
*
40+
* @param plan The user-supplied LogicalPlan defining a flow.
41+
* @return A FlowFunction that attempts to analyze the provided LogicalPlan.
42+
*/
3243
def createFlowFunctionFromLogicalPlan(plan: LogicalPlan): FlowFunction = {
3344
new FlowFunction {
3445
override def call(
@@ -105,17 +116,15 @@ object FlowAnalysis {
105116
context,
106117
name = IdentifierHelper.toQuotedString(u.multipartIdentifier),
107118
spark.readStream,
108-
streamingReadOptions = StreamingReadOptions(
109-
apiLanguage = Language.Sql()
110-
)
119+
streamingReadOptions = StreamingReadOptions()
111120
).queryExecution.analyzed
112121

113122
// Batch read on another dataset in the pipeline
114123
case u: UnresolvedRelation =>
115124
readBatchInput(
116125
context,
117126
name = IdentifierHelper.toQuotedString(u.multipartIdentifier),
118-
batchReadOptions = BatchReadOptions(apiLanguage = Language.Sql())
127+
batchReadOptions = BatchReadOptions()
119128
).queryExecution.analyzed
120129
}
121130
Dataset.ofRows(spark, resolvedPlan)

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphErrors.scala

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@ import org.apache.spark.sql.types.StructType
2626
/** Collection of errors that can be thrown during graph resolution / analysis. */
2727
object GraphErrors {
2828

29+
/**
30+
* Throws when a dataset is marked as internal but is not defined in the graph.
31+
*
32+
* @param datasetName the name of the dataset that is not defined
33+
*/
2934
def pipelineLocalDatasetNotDefinedError(datasetName: String): SparkException = {
30-
// TODO: this should be an internal error, as we never expect this to happen
31-
new SparkException(
32-
errorClass = "PIPELINE_LOCAL_DATASET_NOT_DEFINED",
33-
messageParameters = Map("datasetName" -> datasetName),
34-
cause = null
35+
SparkException.internalError(
36+
s"Failed to read dataset '$datasetName'. This dataset was expected to be " +
37+
s"defined and created by the pipeline."
3538
)
3639
}
3740

@@ -54,6 +57,11 @@ object GraphErrors {
5457
)
5558
}
5659

60+
/**
61+
* Throws when a table path is unresolved, i.e. the table identifier does not exist in the catalog.
62+
*
63+
* @param identifier the unresolved table identifier
64+
*/
5765
def unresolvedTablePath(identifier: TableIdentifier): SparkException = {
5866
new SparkException(
5967
errorClass = "UNRESOLVED_TABLE_PATH",
@@ -62,6 +70,11 @@ object GraphErrors {
6270
)
6371
}
6472

73+
/**
74+
* Throws an error if the user-specified schema and the inferred schema are not compatible.
75+
*
76+
* @param tableIdentifier the identifier of the table that was not found
77+
*/
6578
def incompatibleUserSpecifiedAndInferredSchemasError(
6679
tableIdentifier: TableIdentifier,
6780
datasetType: DatasetType,
@@ -92,6 +105,12 @@ object GraphErrors {
92105
)
93106
}
94107

108+
/**
109+
* Throws if the latest inferred schema for a pipeline table is not compatible with
110+
* the table's existing schema.
111+
*
112+
* @param tableIdentifier the identifier of the table that was not found
113+
*/
95114
def unableToInferSchemaError(
96115
tableIdentifier: TableIdentifier,
97116
inferredSchema: StructType,
@@ -109,6 +128,12 @@ object GraphErrors {
109128
)
110129
}
111130

131+
/**
132+
* Throws an error when a persisted view is trying to read from a temporary view.
133+
*
134+
* @param persistedViewIdentifier the identifier of the persisted view
135+
* @param temporaryViewIdentifier the identifier of the temporary view
136+
*/
112137
def persistedViewReadsFromTemporaryView(
113138
persistedViewIdentifier: TableIdentifier,
114139
temporaryViewIdentifier: TableIdentifier): AnalysisException = {

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ trait GraphValidations extends Logging {
3030
this: DataflowGraph =>
3131

3232
/**
33-
* Validate multi query table correctness. Exposed for Python unit testing, which currently cannot
34-
* run anything which invokes the flow function as there's no persistent Python to run it.
35-
*
36-
* @return the multi-query tables by destination
33+
* Validate multi query table correctness.
3734
*/
3835
protected[pipelines] def validateMultiQueryTables(): Map[TableIdentifier, Seq[Flow]] = {
3936
val multiQueryTables = flowsTo.filter(_._2.size > 1)

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.pipelines.graph
1919

20+
import org.apache.spark.SparkException
2021
import org.apache.spark.internal.Logging
2122
import org.apache.spark.sql.AnalysisException
2223
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -38,8 +39,12 @@ case class UnresolvedDatasetException(identifier: TableIdentifier)
3839
* @param name The name of the table
3940
* @param cause The cause of the failure
4041
*/
41-
case class LoadTableException(name: String, override val cause: Option[Throwable])
42-
extends AnalysisException(s"Failed to load table '$name'", cause = cause)
42+
case class LoadTableException(name: String, cause: Option[Throwable])
43+
extends SparkException(
44+
errorClass = "INTERNAL_ERROR",
45+
messageParameters = Map("message" -> s"Failed to load table '$name'"),
46+
cause = cause.orNull
47+
)
4348

4449
/**
4550
* Exception raised when a pipeline has one or more flows that cannot be resolved
@@ -70,8 +75,8 @@ case class UnresolvedPipelineException(
7075
.sorted
7176
.mkString(", ")}
7277
|
73-
|To view the exceptions that were raised while resolving these flows, look for FlowProgress
74-
|logs with status FAILED that precede this log.""".stripMargin
78+
|To view the exceptions that were raised while resolving these flows, look for flow
79+
|failures that precede this log.""".stripMargin
7580
)
7681

7782
/** A validation error that can either be thrown as an exception or logged as a warning. */

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/QueryOrigin.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ import org.apache.spark.sql.pipelines.logging.SourceCodeLocation
2929
*
3030
* @param language The language used by the user to define the query.
3131
* @param fileName The file name of the user code that defines the query.
32-
* @param cellNumber The cell number of the user code that defines the query.
33-
* Cell numbers are 1-indexed.
3432
* @param sqlText The SQL text of the query.
3533
* @param line The line number of the query in the user code.
3634
* Line numbers are 1-indexed.
@@ -41,7 +39,6 @@ import org.apache.spark.sql.pipelines.logging.SourceCodeLocation
4139
case class QueryOrigin(
4240
language: Option[Language] = None,
4341
fileName: Option[String] = None,
44-
cellNumber: Option[Int] = None,
4542
sqlText: Option[String] = None,
4643
line: Option[Int] = None,
4744
startPosition: Option[Int] = None,

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/util/InputReadInfo.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,24 @@ import org.apache.spark.sql.pipelines.util.StreamingReadOptions.EmptyUserOptions
2424
/**
2525
* Generic options for a read of an input.
2626
*/
27-
sealed trait InputReadOptions {
28-
// The language of the public API that called this function.
29-
def apiLanguage: Language
30-
}
27+
sealed trait InputReadOptions
3128

3229
/**
3330
* Options for a batch read of an input.
3431
*
3532
* @param apiLanguage The language of the public API that called this function.
3633
*/
37-
final case class BatchReadOptions(apiLanguage: Language) extends InputReadOptions
34+
final case class BatchReadOptions() extends InputReadOptions
3835

3936
/**
4037
* Options for a streaming read of an input.
4138
*
42-
* @param apiLanguage The language of the public API that called this function.
4339
* @param userOptions Holds the user defined read options.
4440
* @param droppedUserOptions Holds the options that were specified by the user but
4541
* not actually used. This is a bug but we are preserving this behavior
4642
* for now to avoid making a backwards incompatible change.
4743
*/
4844
final case class StreamingReadOptions(
49-
apiLanguage: Language,
5045
userOptions: CaseInsensitiveMap[String] = EmptyUserOptions,
5146
droppedUserOptions: CaseInsensitiveMap[String] = EmptyUserOptions
5247
) extends InputReadOptions

0 commit comments

Comments
 (0)