Skip to content

Commit bdf6f00

Browse files
aakash-dbsryzagengliangwang
authored andcommitted
[SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution
### What changes were proposed in this pull request? This PR introduces the `DataflowGraph`, a container for Declarative Pipelines datasets and flows, as described in the [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig). It also adds functionality for - Constructing a graph by registering a set of graph elements in succession (`GraphRegistrationContext`) - "Resolving" a graph, which means resolving each of the flows within a graph. Resolving a flow means: - Validating that its plan can be successfully analyzed - Determining the schema of the data it will produce - Determining what upstream datasets within the graph it depends on It also introduces various secondary changes: * Changes to `SparkBuild` to support declarative pipelines. * Updates to the `pom.xml` for the module. * New error conditions ### Why are the changes needed? In order to implement Declarative Pipelines. ### Does this PR introduce _any_ user-facing change? No changes to existing behavior. ### How was this patch tested? New test suites: - `ConnectValidPipelineSuite` – test cases where the graph can be successfully resolved - `ConnectInvalidPipelineSuite` – test cases where the graph fails to be resolved ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51003 from aakash-db/graph-resolution. Lead-authored-by: Aakash Japi <aakash.japi@databricks.com> Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com> Co-authored-by: Sandy Ryza <sandyryza@gmail.com> Co-authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 3ad7e07 commit bdf6f00

30 files changed

+5722
-2
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@
8282
],
8383
"sqlState" : "XX000"
8484
},
85+
"APPEND_ONCE_FROM_BATCH_QUERY" : {
86+
"message" : [
87+
"Creating a streaming table from a batch query prevents incremental loading of new data from source. Offending table: '<table>'.",
88+
"Please use the stream() operator. Example usage:",
89+
"CREATE STREAMING TABLE <target table name> ... AS SELECT ... FROM stream(<source table name>) ..."
90+
],
91+
"sqlState" : "42000"
92+
},
8593
"ARITHMETIC_OVERFLOW" : {
8694
"message" : [
8795
"<message>.<alternative> If necessary set <config> to \"false\" to bypass this error."
@@ -1372,6 +1380,12 @@
13721380
},
13731381
"sqlState" : "42734"
13741382
},
1383+
"DUPLICATE_FLOW_SQL_CONF" : {
1384+
"message" : [
1385+
"Found duplicate sql conf for dataset '<datasetName>': '<key>' is defined by both '<flowName1>' and '<flowName2>'"
1386+
],
1387+
"sqlState" : "42710"
1388+
},
13751389
"DUPLICATE_KEY" : {
13761390
"message" : [
13771391
"Found duplicate keys <keyColumn>."
@@ -1943,6 +1957,12 @@
19431957
],
19441958
"sqlState" : "42818"
19451959
},
1960+
"INCOMPATIBLE_BATCH_VIEW_READ" : {
1961+
"message" : [
1962+
"View <datasetIdentifier> is a batch view and must be referenced using SparkSession#read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
1963+
],
1964+
"sqlState" : "42000"
1965+
},
19461966
"INCOMPATIBLE_COLUMN_TYPE" : {
19471967
"message" : [
19481968
"<operator> can only be performed on tables with compatible column types. The <columnOrdinalNumber> column of the <tableOrdinalNumber> table is <dataType1> type which is not compatible with <dataType2> at the same column of the first table.<hint>."
@@ -2019,6 +2039,12 @@
20192039
],
20202040
"sqlState" : "42613"
20212041
},
2042+
"INCOMPATIBLE_STREAMING_VIEW_READ" : {
2043+
"message" : [
2044+
"View <datasetIdentifier> is a streaming view and must be referenced using SparkSession#readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
2045+
],
2046+
"sqlState" : "42000"
2047+
},
20222048
"INCOMPATIBLE_VIEW_SCHEMA_CHANGE" : {
20232049
"message" : [
20242050
"The SQL query of view <viewName> has an incompatible schema change and column <colName> cannot be resolved. Expected <expectedNum> columns named <colName> but got <actualCols>.",
@@ -3119,6 +3145,12 @@
31193145
},
31203146
"sqlState" : "KD002"
31213147
},
3148+
"INVALID_NAME_IN_USE_COMMAND" : {
3149+
"message" : [
3150+
"Invalid name '<name>' in <command> command. Reason: <reason>"
3151+
],
3152+
"sqlState" : "42000"
3153+
},
31223154
"INVALID_NON_DETERMINISTIC_EXPRESSIONS" : {
31233155
"message" : [
31243156
"The operator expects a deterministic expression, but the actual expression is <sqlExprs>."
@@ -3384,6 +3416,12 @@
33843416
],
33853417
"sqlState" : "22023"
33863418
},
3419+
"INVALID_RESETTABLE_DEPENDENCY" : {
3420+
"message" : [
3421+
"Tables <upstreamResettableTables> are resettable but have a non-resettable downstream dependency '<downstreamTable>'. `reset` will fail as Spark Streaming does not support deleted source data. You can either remove the <resetAllowedKey>=false property from '<downstreamTable>' or add it to its upstream dependencies."
3422+
],
3423+
"sqlState" : "42000"
3424+
},
33873425
"INVALID_RESET_COMMAND_FORMAT" : {
33883426
"message" : [
33893427
"Expected format is 'RESET' or 'RESET key'. If you want to include special characters in key, please use quotes, e.g., RESET `key`."
@@ -5419,6 +5457,19 @@
54195457
],
54205458
"sqlState" : "58030"
54215459
},
5460+
"UNABLE_TO_INFER_PIPELINE_TABLE_SCHEMA" : {
5461+
"message" : [
5462+
"Failed to infer the schema for table <tableName> from its upstream flows.",
5463+
"Please modify the flows that write to this table to make their schemas compatible.",
5464+
"",
5465+
"Inferred schema so far:",
5466+
"<inferredDataSchema>",
5467+
"",
5468+
"Incompatible schema:",
5469+
"<incompatibleDataSchema>"
5470+
],
5471+
"sqlState" : "42KD9"
5472+
},
54225473
"UNABLE_TO_INFER_SCHEMA" : {
54235474
"message" : [
54245475
"Unable to infer schema for <format>. It must be specified manually."
@@ -5590,6 +5641,12 @@
55905641
],
55915642
"sqlState" : "42883"
55925643
},
5644+
"UNRESOLVED_TABLE_PATH" : {
5645+
"message" : [
5646+
"Storage path for table <identifier> cannot be resolved."
5647+
],
5648+
"sqlState" : "22KD1"
5649+
},
55935650
"UNRESOLVED_USING_COLUMN_FOR_JOIN" : {
55945651
"message" : [
55955652
"USING column <colName> cannot be resolved on the <side> side of the join. The <side>-side columns: [<suggestion>]."
@@ -6571,6 +6628,20 @@
65716628
],
65726629
"sqlState" : "P0001"
65736630
},
6631+
"USER_SPECIFIED_AND_INFERRED_SCHEMA_NOT_COMPATIBLE" : {
6632+
"message" : [
6633+
"Table '<tableName>' has a user-specified schema that is incompatible with the schema",
6634+
"inferred from its query.",
6635+
"<streamingTableHint>",
6636+
"",
6637+
"Declared schema:",
6638+
"<specifiedSchema>",
6639+
"",
6640+
"Inferred schema:",
6641+
"<inferredDataSchema>"
6642+
],
6643+
"sqlState" : "42000"
6644+
},
65746645
"VARIABLE_ALREADY_EXISTS" : {
65756646
"message" : [
65766647
"Cannot create the variable <variableName> because it already exists.",

sql/pipelines/pom.xml

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
~ limitations under the License.
1717
-->
1818

19-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2021
<modelVersion>4.0.0</modelVersion>
2122
<parent>
2223
<groupId>org.apache.spark</groupId>
@@ -40,7 +41,6 @@
4041
<groupId>org.apache.spark</groupId>
4142
<artifactId>spark-core_${scala.binary.version}</artifactId>
4243
<version>${project.version}</version>
43-
<scope>test</scope>
4444
</dependency>
4545
<dependency>
4646
<groupId>org.apache.spark</groupId>
@@ -49,6 +49,78 @@
4949
<type>test-jar</type>
5050
<scope>test</scope>
5151
</dependency>
52+
<dependency>
53+
<groupId>org.apache.spark</groupId>
54+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
55+
<version>${project.version}</version>
56+
</dependency>
57+
<dependency>
58+
<groupId>org.apache.spark</groupId>
59+
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
60+
<version>${project.version}</version>
61+
<type>test-jar</type>
62+
<scope>test</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>org.apache.spark</groupId>
66+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
67+
<version>${project.version}</version>
68+
<type>test-jar</type>
69+
<scope>test</scope>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.apache.spark</groupId>
73+
<artifactId>spark-sql-api_${scala.binary.version}</artifactId>
74+
<version>${project.version}</version>
75+
<type>test-jar</type>
76+
<scope>test</scope>
77+
<exclusions>
78+
<exclusion>
79+
<groupId>org.apache.spark</groupId>
80+
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
81+
</exclusion>
82+
</exclusions>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.scala-lang.modules</groupId>
86+
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
87+
</dependency>
88+
<dependency>
89+
<groupId>org.scalacheck</groupId>
90+
<artifactId>scalacheck_${scala.binary.version}</artifactId>
91+
<scope>test</scope>
92+
</dependency>
93+
<dependency>
94+
<groupId>org.mockito</groupId>
95+
<artifactId>mockito-core</artifactId>
96+
<scope>test</scope>
97+
</dependency>
98+
<dependency>
99+
<groupId>net.bytebuddy</groupId>
100+
<artifactId>byte-buddy</artifactId>
101+
<scope>test</scope>
102+
</dependency>
103+
<dependency>
104+
<groupId>net.bytebuddy</groupId>
105+
<artifactId>byte-buddy-agent</artifactId>
106+
<scope>test</scope>
107+
</dependency>
108+
<dependency>
109+
<groupId>org.apache.spark</groupId>
110+
<artifactId>spark-tags_${scala.binary.version}</artifactId>
111+
</dependency>
112+
113+
<!--
114+
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
115+
them will yield errors.
116+
-->
117+
<dependency>
118+
<groupId>org.apache.spark</groupId>
119+
<artifactId>spark-tags_${scala.binary.version}</artifactId>
120+
<type>test-jar</type>
121+
<scope>test</scope>
122+
</dependency>
123+
52124
</dependencies>
53125
<build>
54126
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.pipelines
19+
20+
/** Represents a warning generated as part of graph analysis. */
21+
sealed trait AnalysisWarning
22+
23+
object AnalysisWarning {
24+
25+
/**
26+
* Warning that some streaming reader options are being dropped
27+
*
28+
* @param sourceName Source for which reader options are being dropped.
29+
* @param droppedOptions Set of reader options that are being dropped for a specific source.
30+
*/
31+
case class StreamingReaderOptionsDropped(sourceName: String, droppedOptions: Seq[String])
32+
extends AnalysisWarning
33+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.pipelines
19+
20+
sealed trait Language {}
21+
22+
object Language {
23+
case class Python() extends Language {}
24+
case class Sql() extends Language {}
25+
}
26+

0 commit comments

Comments
 (0)