Skip to content

Commit b9681a5

Browse files
aakash-dbsryzagengliangwang
authored andcommitted
[SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution
### What changes were proposed in this pull request? This PR introduces the `DataflowGraph`, a container for Declarative Pipelines datasets and flows, as described in the [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig). It also adds functionality for - Constructing a graph by registering a set of graph elements in succession (`GraphRegistrationContext`) - "Resolving" a graph, which means resolving each of the flows within a graph. Resolving a flow means: - Validating that its plan can be successfully analyzed - Determining the schema of the data it will produce - Determining what upstream datasets within the graph it depends on It also introduces various secondary changes: * Changes to `SparkBuild` to support declarative pipelines. * Updates to the `pom.xml` for the module. * New error conditions ### Why are the changes needed? In order to implement Declarative Pipelines. ### Does this PR introduce _any_ user-facing change? No changes to existing behavior. ### How was this patch tested? New test suites: - `ConnectValidPipelineSuite` – test cases where the graph can be successfully resolved - `ConnectInvalidPipelineSuite` – test cases where the graph fails to be resolved ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51003 from aakash-db/graph-resolution. Lead-authored-by: Aakash Japi <aakash.japi@databricks.com> Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com> Co-authored-by: Sandy Ryza <sandyryza@gmail.com> Co-authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 601681a commit b9681a5

31 files changed

+5730
-2
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@
8282
],
8383
"sqlState" : "XX000"
8484
},
85+
"APPEND_ONCE_FROM_BATCH_QUERY" : {
86+
"message" : [
87+
"Creating a streaming table from a batch query prevents incremental loading of new data from source. Offending table: '<table>'.",
88+
"Please use the stream() operator. Example usage:",
89+
"CREATE STREAMING TABLE <target table name> ... AS SELECT ... FROM stream(<source table name>) ..."
90+
],
91+
"sqlState" : "42000"
92+
},
8593
"ARITHMETIC_OVERFLOW" : {
8694
"message" : [
8795
"<message>.<alternative> If necessary set <config> to \"false\" to bypass this error."
@@ -1372,6 +1380,12 @@
13721380
},
13731381
"sqlState" : "42734"
13741382
},
1383+
"DUPLICATE_FLOW_SQL_CONF" : {
1384+
"message" : [
1385+
"Found duplicate sql conf for dataset '<datasetName>': '<key>' is defined by both '<flowName1>' and '<flowName2>'"
1386+
],
1387+
"sqlState" : "42710"
1388+
},
13751389
"DUPLICATE_KEY" : {
13761390
"message" : [
13771391
"Found duplicate keys <keyColumn>."
@@ -1943,6 +1957,12 @@
19431957
],
19441958
"sqlState" : "42818"
19451959
},
1960+
"INCOMPATIBLE_BATCH_VIEW_READ" : {
1961+
"message" : [
1962+
"View <datasetIdentifier> is a batch view and must be referenced using SparkSession#read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
1963+
],
1964+
"sqlState" : "42000"
1965+
},
19461966
"INCOMPATIBLE_COLUMN_TYPE" : {
19471967
"message" : [
19481968
"<operator> can only be performed on tables with compatible column types. The <columnOrdinalNumber> column of the <tableOrdinalNumber> table is <dataType1> type which is not compatible with <dataType2> at the same column of the first table.<hint>."
@@ -2019,6 +2039,12 @@
20192039
],
20202040
"sqlState" : "42613"
20212041
},
2042+
"INCOMPATIBLE_STREAMING_VIEW_READ" : {
2043+
"message" : [
2044+
"View <datasetIdentifier> is a streaming view and must be referenced using SparkSession#readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
2045+
],
2046+
"sqlState" : "42000"
2047+
},
20222048
"INCOMPATIBLE_VIEW_SCHEMA_CHANGE" : {
20232049
"message" : [
20242050
"The SQL query of view <viewName> has an incompatible schema change and column <colName> cannot be resolved. Expected <expectedNum> columns named <colName> but got <actualCols>.",
@@ -3119,6 +3145,12 @@
31193145
},
31203146
"sqlState" : "KD002"
31213147
},
3148+
"INVALID_NAME_IN_USE_COMMAND" : {
3149+
"message" : [
3150+
"Invalid name '<name>' in <command> command. Reason: <reason>"
3151+
],
3152+
"sqlState" : "42000"
3153+
},
31223154
"INVALID_NON_DETERMINISTIC_EXPRESSIONS" : {
31233155
"message" : [
31243156
"The operator expects a deterministic expression, but the actual expression is <sqlExprs>."
@@ -3384,6 +3416,12 @@
33843416
],
33853417
"sqlState" : "22023"
33863418
},
3419+
"INVALID_RESETTABLE_DEPENDENCY" : {
3420+
"message" : [
3421+
"Tables <upstreamResettableTables> are resettable but have a non-resettable downstream dependency '<downstreamTable>'. `reset` will fail as Spark Streaming does not support deleted source data. You can either remove the <resetAllowedKey>=false property from '<downstreamTable>' or add it to its upstream dependencies."
3422+
],
3423+
"sqlState" : "42000"
3424+
},
33873425
"INVALID_RESET_COMMAND_FORMAT" : {
33883426
"message" : [
33893427
"Expected format is 'RESET' or 'RESET key'. If you want to include special characters in key, please use quotes, e.g., RESET `key`."
@@ -5419,6 +5457,19 @@
54195457
],
54205458
"sqlState" : "58030"
54215459
},
5460+
"UNABLE_TO_INFER_PIPELINE_TABLE_SCHEMA" : {
5461+
"message" : [
5462+
"Failed to infer the schema for table <tableName> from its upstream flows.",
5463+
"Please modify the flows that write to this table to make their schemas compatible.",
5464+
"",
5465+
"Inferred schema so far:",
5466+
"<inferredDataSchema>",
5467+
"",
5468+
"Incompatible schema:",
5469+
"<incompatibleDataSchema>"
5470+
],
5471+
"sqlState" : "42KD9"
5472+
},
54225473
"UNABLE_TO_INFER_SCHEMA" : {
54235474
"message" : [
54245475
"Unable to infer schema for <format>. It must be specified manually."
@@ -5590,6 +5641,12 @@
55905641
],
55915642
"sqlState" : "42883"
55925643
},
5644+
"UNRESOLVED_TABLE_PATH" : {
5645+
"message" : [
5646+
"Storage path for table <identifier> cannot be resolved."
5647+
],
5648+
"sqlState" : "22KD1"
5649+
},
55935650
"UNRESOLVED_USING_COLUMN_FOR_JOIN" : {
55945651
"message" : [
55955652
"USING column <colName> cannot be resolved on the <side> side of the join. The <side>-side columns: [<suggestion>]."
@@ -6571,6 +6628,20 @@
65716628
],
65726629
"sqlState" : "P0001"
65736630
},
6631+
"USER_SPECIFIED_AND_INFERRED_SCHEMA_NOT_COMPATIBLE" : {
6632+
"message" : [
6633+
"Table '<tableName>' has a user-specified schema that is incompatible with the schema",
6634+
"inferred from its query.",
6635+
"<streamingTableHint>",
6636+
"",
6637+
"Declared schema:",
6638+
"<specifiedSchema>",
6639+
"",
6640+
"Inferred schema:",
6641+
"<inferredDataSchema>"
6642+
],
6643+
"sqlState" : "42000"
6644+
},
65746645
"VARIABLE_ALREADY_EXISTS" : {
65756646
"message" : [
65766647
"Cannot create the variable <variableName> because it already exists.",

project/SparkBuild.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,8 @@ object SparkBuild extends PomBuild {
418418

419419
enable(HiveThriftServer.settings)(hiveThriftServer)
420420

421+
enable(SparkDeclarativePipelines.settings)(pipelines)
422+
421423
enable(SparkConnectCommon.settings)(connectCommon)
422424
enable(SparkConnect.settings)(connect)
423425
enable(SparkConnectClient.settings)(connectClient)
@@ -884,6 +886,57 @@ object SparkConnectClient {
884886
)
885887
}
886888

889+
object SparkDeclarativePipelines {
890+
import BuildCommons.protoVersion
891+
892+
lazy val settings = Seq(
893+
// For some reason the resolution from the imported Maven build does not work for some
894+
// of these dependendencies that we need to shade later on.
895+
libraryDependencies ++= {
896+
val guavaVersion =
897+
SbtPomKeys.effectivePom.value.getProperties.get(
898+
"connect.guava.version").asInstanceOf[String]
899+
val guavaFailureAccessVersion =
900+
SbtPomKeys.effectivePom.value.getProperties.get(
901+
"guava.failureaccess.version").asInstanceOf[String]
902+
Seq(
903+
"com.google.guava" % "guava" % guavaVersion,
904+
"com.google.guava" % "failureaccess" % guavaFailureAccessVersion,
905+
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf"
906+
)
907+
},
908+
909+
(assembly / logLevel) := Level.Info,
910+
911+
// Exclude `scala-library` from assembly.
912+
(assembly / assemblyPackageScala / assembleArtifact) := false,
913+
914+
// SPARK-46733: Include `spark-connect-*.jar`, `unused-*.jar`,`guava-*.jar`,
915+
// `failureaccess-*.jar`, `annotations-*.jar`, `grpc-*.jar`, `protobuf-*.jar`,
916+
// `gson-*.jar`, `error_prone_annotations-*.jar`, `j2objc-annotations-*.jar`,
917+
// `animal-sniffer-annotations-*.jar`, `perfmark-api-*.jar`,
918+
// `proto-google-common-protos-*.jar` in assembly.
919+
// This needs to be consistent with the content of `maven-shade-plugin`.
920+
(assembly / assemblyExcludedJars) := {
921+
val cp = (assembly / fullClasspath).value
922+
val validPrefixes = Set("spark-connect", "unused-", "guava-", "failureaccess-",
923+
"annotations-", "grpc-", "protobuf-", "gson", "error_prone_annotations",
924+
"j2objc-annotations", "animal-sniffer-annotations", "perfmark-api",
925+
"proto-google-common-protos")
926+
cp filterNot { v =>
927+
validPrefixes.exists(v.data.getName.startsWith)
928+
}
929+
},
930+
931+
(assembly / assemblyMergeStrategy) := {
932+
case m if m.toLowerCase(Locale.ROOT).endsWith("manifest.mf") => MergeStrategy.discard
933+
// Drop all proto files that are not needed as artifacts of the build.
934+
case m if m.toLowerCase(Locale.ROOT).endsWith(".proto") => MergeStrategy.discard
935+
case _ => MergeStrategy.first
936+
}
937+
)
938+
}
939+
887940
object SparkProtobuf {
888941
import BuildCommons.protoVersion
889942

sql/pipelines/pom.xml

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
~ limitations under the License.
1717
-->
1818

19-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2021
<modelVersion>4.0.0</modelVersion>
2122
<parent>
2223
<groupId>org.apache.spark</groupId>
@@ -44,14 +45,125 @@
4445
</dependency>
4546
<dependency>
4647
<groupId>org.apache.spark</groupId>
47-
<artifactId>spark-core_${scala.binary.version}</artifactId>
48+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
49+
<version>${project.version}</version>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.apache.spark</groupId>
53+
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
54+
<version>${project.version}</version>
55+
<type>test-jar</type>
56+
<scope>test</scope>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.apache.spark</groupId>
60+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
4861
<version>${project.version}</version>
4962
<type>test-jar</type>
5063
<scope>test</scope>
5164
</dependency>
65+
<dependency>
66+
<groupId>org.apache.spark</groupId>
67+
<artifactId>spark-sql-api_${scala.binary.version}</artifactId>
68+
<version>${project.version}</version>
69+
<type>test-jar</type>
70+
<scope>test</scope>
71+
<exclusions>
72+
<exclusion>
73+
<groupId>org.apache.spark</groupId>
74+
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
75+
</exclusion>
76+
</exclusions>
77+
</dependency>
78+
<dependency>
79+
<groupId>org.scala-lang.modules</groupId>
80+
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
81+
</dependency>
82+
<dependency>
83+
<groupId>org.scalacheck</groupId>
84+
<artifactId>scalacheck_${scala.binary.version}</artifactId>
85+
<scope>test</scope>
86+
</dependency>
87+
<dependency>
88+
<groupId>org.mockito</groupId>
89+
<artifactId>mockito-core</artifactId>
90+
<scope>test</scope>
91+
</dependency>
92+
<dependency>
93+
<groupId>net.bytebuddy</groupId>
94+
<artifactId>byte-buddy</artifactId>
95+
<scope>test</scope>
96+
</dependency>
97+
<dependency>
98+
<groupId>net.bytebuddy</groupId>
99+
<artifactId>byte-buddy-agent</artifactId>
100+
<scope>test</scope>
101+
</dependency>
102+
<dependency>
103+
<groupId>org.apache.spark</groupId>
104+
<artifactId>spark-tags_${scala.binary.version}</artifactId>
105+
</dependency>
106+
107+
<!--
108+
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
109+
them will yield errors.
110+
-->
111+
<dependency>
112+
<groupId>org.apache.spark</groupId>
113+
<artifactId>spark-tags_${scala.binary.version}</artifactId>
114+
<type>test-jar</type>
115+
<scope>test</scope>
116+
</dependency>
117+
52118
</dependencies>
53119
<build>
54120
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
55121
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
122+
<plugins>
123+
<plugin>
124+
<groupId>kr.motd.maven</groupId>
125+
<artifactId>os-maven-plugin</artifactId>
126+
<version>1.7.0</version>
127+
<extensions>true</extensions>
128+
</plugin>
129+
<plugin>
130+
<groupId>org.xolstice.maven.plugins</groupId>
131+
<artifactId>protobuf-maven-plugin</artifactId>
132+
<version>0.6.1</version>
133+
<configuration>
134+
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
135+
</protocArtifact>
136+
<protoSourceRoot>src/main/protobuf</protoSourceRoot>
137+
<pluginId>grpc-java</pluginId>
138+
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
139+
</pluginArtifact>
140+
</configuration>
141+
<executions>
142+
<execution>
143+
<phase>generate-sources</phase>
144+
<goals>
145+
<goal>compile</goal>
146+
<goal>compile-custom</goal>
147+
<goal>test-compile</goal>
148+
</goals>
149+
</execution>
150+
</executions>
151+
</plugin>
152+
<plugin>
153+
<!-- see http://davidb.github.com/scala-maven-plugin -->
154+
<groupId>net.alchim31.maven</groupId>
155+
<artifactId>scala-maven-plugin</artifactId>
156+
<version>4.9.2</version>
157+
<executions>
158+
<execution>
159+
<goals>
160+
<goal>compile</goal>
161+
<goal>testCompile</goal>
162+
</goals>
163+
</execution>
164+
</executions>
165+
</plugin>
166+
<!-- os-maven-plugin helps resolve ${os.detected.classifier} automatically -->
167+
</plugins>
56168
</build>
57169
</project>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.pipelines
19+
20+
/** Represents a warning generated as part of graph analysis. */
21+
sealed trait AnalysisWarning
22+
23+
object AnalysisWarning {
24+
25+
/**
26+
* Warning that some streaming reader options are being dropped
27+
*
28+
* @param sourceName Source for which reader options are being dropped.
29+
* @param droppedOptions Set of reader options that are being dropped for a specific source.
30+
*/
31+
case class StreamingReaderOptionsDropped(sourceName: String, droppedOptions: Seq[String])
32+
extends AnalysisWarning
33+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.pipelines
19+
20+
sealed trait Language {}
21+
22+
object Language {
23+
case class Python() extends Language {}
24+
case class Sql() extends Language {}
25+
}
26+

0 commit comments

Comments
 (0)