Skip to content

[SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution #51003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from

Conversation

aakash-db
Copy link
Contributor

@aakash-db aakash-db commented May 23, 2025

What changes were proposed in this pull request?

This PR introduces the DataflowGraph, a container for Declarative Pipelines datasets and flows, as described in the Declarative Pipelines SPIP. It also adds functionality for

  • Constructing a graph by registering a set of graph elements in succession (GraphRegistrationContext)
  • "Resolving" a graph, which means resolving each of the flows within a graph. Resolving a flow means:
    • Validating that its plan can be successfully analyzed
    • Determining the schema of the data it will produce
    • Determining what upstream datasets within the graph it depends on

It also introduces various secondary changes:

  • Changes to SparkBuild to support declarative pipelines.
  • Updates to the pom.xml for the module.
  • New error conditions

Why are the changes needed?

In order to implement Declarative Pipelines.

Does this PR introduce any user-facing change?

No changes to existing behavior.

How was this patch tested?

New test suites:

  • ConnectValidPipelineSuite – test cases where the graph can be successfully resolved
  • ConnectInvalidPipelineSuite – test cases where the graph fails to be resolved

Was this patch authored or co-authored using generative AI tooling?

No

@sryza sryza changed the title [SPARK-52283][CONNECT] SDP DataflowGraph creation and resolution [SPARK-52283][CONNECT] Declarative Pipelines DataflowGraph creation and resolution May 23, 2025
@sryza sryza self-requested a review May 23, 2025 21:01
@sryza sryza self-assigned this May 23, 2025
Copy link

@jonmio jonmio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing some comments

@@ -2025,6 +2031,18 @@
],
"sqlState" : "42613"
},
"INCOMPATIBLE_BATCH_VIEW_READ": {
"message": [
"View <datasetIdentifier> is not a streaming view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this conf and do we really need it?

* @param upstreamNodes Upstream nodes for the node
* @return
*/
def processNode(node: GraphElement, upstreamNodes: Seq[GraphElement]): Seq[GraphElement] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Document return. I'm especially curious why this is a Seq and when processNode would return more than one element

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, it's mostly just for flexibility - in case one node maps to several in the future.

@apache apache deleted a comment from aakash-db May 27, 2025
@aakash-db aakash-db requested a review from sryza May 27, 2025 22:40
@aakash-db aakash-db changed the title [SPARK-52283][CONNECT] Declarative Pipelines DataflowGraph creation and resolution [SPARK-52283] Declarative Pipelines DataflowGraph creation and resolution May 27, 2025
@aakash-db aakash-db changed the title [SPARK-52283] Declarative Pipelines DataflowGraph creation and resolution [SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution May 27, 2025
@sryza sryza requested a review from cloud-fan May 27, 2025 22:53
val materializedFlowIdentifiers: Set[TableIdentifier] = materializedFlows.map(_.identifier).toSet

/** Returns a [[Table]] given its identifier */
lazy val table: Map[TableIdentifier, Table] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableIdentifier only supports 3-level namespace. Shall we use Seq[String] to better support DS v2, which can have an arbitrary level of namespace?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq[String] is a bit hard to use here. We can switch to the DS v2 API after we create an encapsulation class to match TableIdentifier.

@aakash-db aakash-db requested a review from jonmio May 28, 2025 05:52
@sryza sryza force-pushed the graph-resolution branch from 27f4aee to f310b4f Compare June 4, 2025 14:55
@sryza
Copy link
Contributor

sryza commented Jun 4, 2025

Hey @gengliangwang @LuciferYang – I just rebased the PR on master and pushed an update that fixes those log messages.

@LuciferYang
Copy link
Contributor

image

</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this plugin? The pipelines module doesn't seem to have any .proto files that need compilation.

<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parent pom.xml has already configured this plugin.

// This needs to be consistent with the content of `maven-shade-plugin`.
(assembly / assemblyExcludedJars) := {
val cp = (assembly / fullClasspath).value
val validPrefixes = Set("spark-connect", "unused-", "guava-", "failureaccess-",
Copy link
Contributor

@LuciferYang LuciferYang Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the assembly jar built this way will not include the code related to the spark-pipelines module. Is this what we expect? And does the assembly jar for pipelines really need to include grpc-*.jar? These should all be configured on demand.

In addition, if the pipelines module uses the assembly plugin, then corresponding configurations for the maven-shade-plugin should be made in the pom.xml file, and it should be ensured that their packaging results are consistent.

@@ -16,7 +16,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference from before? Can we not modify this line?

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please address LuciferYang's comment in this PR or in a follow-up

@sryza
Copy link
Contributor

sryza commented Jun 4, 2025

@LuciferYang thanks for catching these issues in the pom.xml and SparkBuild. @gengliangwang @LuciferYang I created a followup PR which implements the pom.xml and SparkBuild changes that @LuciferYang raised.

If addressing the build issues in a followup is OK, I'd love to get this PR back in ASAP because there are several downstream PRs that depend on it and are currently in review.

@LuciferYang
Copy link
Contributor

LuciferYang commented Jun 4, 2025

LGTM. Please address LuciferYang's comment in this PR or in a follow-up

Please give me a moment to manually verify the Maven compilation for this pr to avoid any failures in the Maven daily test after merging it.

@LuciferYang
Copy link
Contributor

LuciferYang commented Jun 4, 2025

locally run build/mvn clean install -DskipTests -pl sql/pipelines -am

image

Could you make maven build happy @sryza ?

@@ -44,14 +45,125 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
Copy link
Contributor

@LuciferYang LuciferYang Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
to

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
            <type>test-jar</type>
            <scope>test</scope>
        </dependency>

and the Maven compilation passed. However, when I executed command build/mvn test -pl sql/pipelines to run the Maven test, there were test failures:

ConnectValidPipelineSuite:
org.apache.spark.sql.pipelines.graph.ConnectValidPipelineSuite *** ABORTED ***
  java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.sql.pipelines.graph.ConnectInvalidPipelineSuite.<init>(ConnectInvalidPipelineSuite.scala:30)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
java.base/java.lang.reflect.ReflectAccess.newInstance(ReflectAccess.java:128)
java.base/jdk.internal.reflect.ReflectionFactory.newInstance(ReflectionFactory.java:347)
java.base/java.lang.Class.newInstance(Class.java:645)
org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:66)
org.scalatest.tools.DiscoverySuite.$anonfun$nestedSuites$1(DiscoverySuite.scala:38)
scala.collection.immutable.Vector1.map(Vector.scala:2155)
scala.collection.immutable.Vector1.map(Vector.scala:386)
org.scalatest.tools.DiscoverySuite.<init>(DiscoverySuite.scala:37)
org.scalatest.tools.Runner$.genDiscoSuites$1(Runner.scala:1131)
org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1225)
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
org.scalatest.tools.Runner$.main(Runner.scala:775)

And it was stopped at:

org.apache.spark.sql.pipelines.utils.PipelineTest.afterAll(PipelineTest.scala:136)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigating now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this more deeply. What appears to be happening is:

  • The test framework creates an instance of ConnectInvalidPipelineSuite
    • The initializer invokes SparkSession.builder.getOrCreate() to create a SparkSession and accompanying SparkContext
  • The test framework creates an instance of ConnectValidPipelineSuite
    • The initializer invokes SparkSession.builder.getOrCreate() to create a SparkSession. It is returned the same SparkSession and backing SparkContext that ConnectInvalidPipelineSuite references.
  • The test framework runs all tests inside ConnectInvalidPipelineSuite
  • The test framework invokes the teardown code in ConnectInvalidPipelineSuite, which stops its SparkSession and underlying SparkContext
  • The test framework tries to run beforeEach code in ConnectValidPipelineSuite, but the SparkSession it references is the same one that was referenced in ConnectInvalidPipelineSuite, and that has already been stopped

How does this work normally? There are several suites inside the repo that getOrCreate a SparkSession at instantiation time and stop it afterAll. Do they not run into this problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff --git a/sql/pipelines/pom.xml b/sql/pipelines/pom.xml
index 4ab7db079e4..5cbcebfbc8f 100644
--- a/sql/pipelines/pom.xml
+++ b/sql/pipelines/pom.xml
@@ -41,6 +41,12 @@
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
index 01b2a91bb93..0d74146851c 100644
--- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
+++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.types.{IntegerType, StructType}
  */
 class ConnectInvalidPipelineSuite extends PipelineTest {
 
-  import originalSpark.implicits._
   test("Missing source") {
     class P extends TestGraphRegistrationContext(spark) {
       registerView("b", query = readFlowFunc("a"))
@@ -144,6 +143,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("Missing attribute in the schema") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("z")))
       registerView("b", query = sqlFlowFunc(spark, "SELECT x FROM a"))
@@ -157,6 +158,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("Joining on a column with different names") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))
       registerView("b", query = dfFlowFunc(Seq("a", "b", "c").toDF("y")))
@@ -175,6 +178,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("Writing to one table by unioning flows with different schemas") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))
       registerView("b", query = dfFlowFunc(Seq(true, false).toDF("x")))
@@ -225,6 +230,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("Cyclic graph") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))
       registerView("b", query = sqlFlowFunc(spark, "SELECT * FROM a UNION SELECT * FROM d"))
@@ -246,6 +253,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("Cyclic graph with materialized nodes") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerTable("a", query = Option(dfFlowFunc(Seq(1, 2, 3).toDF("x"))))
       registerTable(
@@ -270,6 +279,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("Cyclic graph - second query makes it cyclic") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerTable("a", query = Option(dfFlowFunc(Seq(1, 2, 3).toDF("x"))))
       registerTable("b")
@@ -293,6 +304,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("Cyclic graph - all named queries") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerTable("a", query = Option(dfFlowFunc(Seq(1, 2, 3).toDF("x"))))
       registerTable("b")
@@ -317,6 +330,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("view-table conf conflict") {
+    val session = spark
+    import session.implicits._
     val p = new TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1).toDF()), sqlConf = Map("x" -> "a-val"))
       registerTable("b", query = Option(readFlowFunc("a")), sqlConf = Map("x" -> "b-val"))
@@ -338,6 +353,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("view-view conf conflict") {
+    val session = spark
+    import session.implicits._
     val p = new TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1).toDF()), sqlConf = Map("x" -> "a-val"))
       registerView("b", query = dfFlowFunc(Seq(1).toDF()), sqlConf = Map("x" -> "b-val"))
@@ -364,6 +381,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("reading a complete view incrementally") {
+    val session = spark
+    import session.implicits._
     val p = new TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1).toDF()))
       registerTable("b", query = Option(readStreamFlowFunc("a")))
@@ -380,6 +399,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("reading an incremental view completely") {
+    val session = spark
+    import session.implicits._
     val p = new TestGraphRegistrationContext(spark) {
       val mem = MemoryStream[Int]
       mem.addData(1)
@@ -398,6 +419,8 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
   }
 
   test("Inferred schema that isn't a subset of user-specified schema") {
+    val session = spark
+    import session.implicits._
     val graph1 = new TestGraphRegistrationContext(spark) {
       registerTable(
         "a",
diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
index f8b5133ff16..d64c5f06cdf 100644
--- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
+++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
@@ -32,9 +32,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  */
 class ConnectValidPipelineSuite extends PipelineTest {
 
-  import originalSpark.implicits._
-
   test("Extra simple") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("b", query = dfFlowFunc(Seq(1, 2, 3).toDF("y")))
     }
@@ -44,6 +44,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Simple") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))
       registerView("b", query = sqlFlowFunc(spark, "SELECT x as y FROM a"))
@@ -65,6 +67,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Dependencies") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))
       registerView("c", query = sqlFlowFunc(spark, "SELECT y as z FROM b"))
@@ -78,6 +82,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Multi-hop schema merging") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView(
         "b",
@@ -93,6 +99,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Cross product join merges schema") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))
       registerView("b", query = dfFlowFunc(Seq(4, 5, 6).toDF("y")))
@@ -111,6 +119,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Real join merges schema") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq((1, "a"), (2, "b"), (3, "c")).toDF("x", "y")))
       registerView("b", query = dfFlowFunc(Seq((2, "m"), (3, "n"), (4, "o")).toDF("x", "z")))
@@ -132,6 +142,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Union of streaming and batch Dataframes") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       val ints = MemoryStream[Int]
       ints.addData(1, 2, 3, 4)
@@ -170,6 +182,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Union of two streaming Dataframes") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       val ints1 = MemoryStream[Int]
       ints1.addData(1, 2, 3, 4)
@@ -214,6 +228,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("MultipleInputs") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))
       registerView("b", query = dfFlowFunc(Seq(4, 5, 6).toDF("y")))
@@ -228,6 +244,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Connect retains and fuses confs") {
+    val session = spark
+    import session.implicits._
     // a -> b \
     //          d
     //      c /
@@ -250,6 +268,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Confs aren't fused past materialization points") {
+    val session = spark
+    import session.implicits._
     val p = new TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1).toDF("x")), Map("a" -> "a-val"))
       registerTable("b", query = Option(readFlowFunc("a")), Map("b" -> "b-val"))
@@ -276,6 +296,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Setting the same conf with the same value is totally cool") {
+    val session = spark
+    import session.implicits._
     val p = new TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")), Map("key" -> "val"))
       registerView("b", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")), Map("key" -> "val"))
@@ -290,6 +312,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Named query only") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       registerView("a", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))
       registerTable("b")
@@ -308,6 +332,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Default query and named query") {
+    val session = spark
+    import session.implicits._
     class P extends TestGraphRegistrationContext(spark) {
       val mem = MemoryStream[Int]
       registerView("a", query = dfFlowFunc(mem.toDF()))
@@ -348,6 +374,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Correct types of flows after connection") {
+    val session = spark
+    import session.implicits._
     val graph = new TestGraphRegistrationContext(spark) {
       val mem = MemoryStream[Int]
       mem.addData(1, 2)
@@ -402,6 +430,8 @@ class ConnectValidPipelineSuite extends PipelineTest {
   }
 
   test("Pipeline level default spark confs are applied with correct precedence") {
+    val session = spark
+    import session.implicits._
     val P = new TestGraphRegistrationContext(
       spark,
       Map("default.conf" -> "value")
diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
index 981fa3cdcae..9039cc0e338 100644
--- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
+++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
@@ -31,7 +31,7 @@ import org.scalatest.matchers.should.Matchers
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Column, QueryTest, Row, TypedColumn}
-import org.apache.spark.sql.SparkSession.{clearActiveSession, setActiveSession}
+import org.apache.spark.sql.SparkSession.{clearActiveSession, clearDefaultSession, setActiveSession, setDefaultSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession, SQLContext}
 import org.apache.spark.sql.execution._
@@ -48,8 +48,7 @@ abstract class PipelineTest
 
   final protected val storageRoot = createTempDir()
 
-  var spark: SparkSession = createAndInitializeSpark()
-  val originalSpark: SparkSession = spark.cloneSession()
+  var spark: SparkSession = _
 
   implicit def sqlContext: SQLContext = spark.sqlContext
   def sql(text: String): DataFrame = spark.sql(text)
@@ -111,8 +110,23 @@ abstract class PipelineTest
   /** Set up the spark session before each test. */
   protected def initializeSparkBeforeEachTest(): Unit = {
     clearActiveSession()
-    spark = originalSpark.newSession()
+    clearDefaultSession()
+    spark = spark.newSession()
     setActiveSession(spark)
+    setDefaultSession(spark)
+  }
+
+  protected def initializeSession(): Unit = {
+    if (spark == null) {
+      spark = createAndInitializeSpark()
+    }
+  }
+
+  protected override def beforeAll(): Unit = {
+    initializeSession()
+
+    // Ensure we have initialized the context before calling parent code
+    super.beforeAll()
   }
 
   override def beforeEach(): Unit = {
@@ -133,7 +147,23 @@ abstract class PipelineTest
   }
 
   override def afterAll(): Unit = {
-    spark.stop()
+    try {
+      super.afterAll()
+    } finally {
+      try {
+        if (spark != null) {
+          try {
+            spark.sessionState.catalog.reset()
+          } finally {
+            spark.stop()
+            spark = null
+          }
+        }
+      } finally {
+        clearActiveSession()
+        clearDefaultSession()
+      }
+    }
   }
 
   protected def gridTest[A](testNamePrefix: String, testTags: Tag*)(params: Seq[A])(

I made some modifications to PipelineTest by referring to SharedSparkSession as above. Then I executed build/mvn clean install -pl sql/pipelines, and the compilation & testing were successful. The aforementioned modifications are for reference only.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the attention you're giving on this. I independently converged on a similar solution actually. I'm still a little bit confused about how this works for other tests that create a SparkSession more globally, like in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SqlBasedBenchmark.scala.

I pushed this change though. Hopefully adding the spark-core_ back in doesn't bring back the duplicate dependencies issue we were running into before 🤞 .

Copy link
Contributor

@sryza sryza Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I also pulled in the pom.xml and SparkBuild changes from the downstream PR)

@sryza
Copy link
Contributor

sryza commented Jun 4, 2025

I verified that these both work on the latest version:

build/mvn clean install -DskipTests -pl sql/pipelines -am

build/mvn test -pl sql/pipelines

@cloud-fan
Copy link
Contributor

The docker test failure is unrelated, but I'll wait for @LuciferYang to do a final sign-off on these build changes.

@LuciferYang
Copy link
Contributor

I verified that these both work on the latest version:

build/mvn clean install -DskipTests -pl sql/pipelines -am

build/mvn test -pl sql/pipelines

double checked, thanks @sryza

@gengliangwang
Copy link
Member

@LuciferYang @sryza thanks for fixing the test failures!
Merging to master

gengliangwang added a commit that referenced this pull request Jun 6, 2025
… event logging

### What changes were proposed in this pull request?

**See the flow chart describing the changes made in this PR: [flow chart link](https://lucid.app/lucidchart/c773b051-c634-4f0e-9a3c-a21e24ae540a/edit?viewport_loc=-4594%2C-78%2C5884%2C3280%2C0_0&invitationId=inv_3f036b9d-1a2a-4dd9-bf50-084cd90e5460)**

As described in [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig), after we parse user's code and represent datasets and dataflows in a `DataflowGraph` (from PR #51003), we execute the `DataflowGraph`. This PR implements this execution.

## Main execution steps inside a pipeline run

### Step 1: Initialize the raw `DataflowGraph`
In `PipelineExecution::runPipeline()`, we first initialize the dataflow graph by topologically sorting the dependencies and also figuring out the expected metadata (e.g., schema) for each dataset (`DataflowGraph::resolve()`). Also, we run some pre-flight validations to caught some early errors like circular dependencies, create a streaming table with batch data source, etc (`DataflowGraph::validate()`).

### Step 2: Materialize datasets defined in the `DataflowGraph` to the catalog
After the graph is topologically sorted and validated and every dataset / flow has correct metadata populated, we publish the corresponding dataset in the catalog (which could be Hive, UC, or others) in `DatasetManager::materializeDatasets()`. For example, for each Materialized View and Table, it would register a empty table in the catalog with correct metadata (e.g., table schema, table properties, etc). If the table already exists, we alter it to have the correct metadata.

### Step 3: Populate data to the registered tables by executing the `DataflowGraph`
After datasets have been registered to the catalog, inside `TriggeredGraphExecution`, we transform each dataflow defined in the `DataflowGraph` into an actual execution plan to run the actual workload and populate the data to the empty table (we transform `Flow` into `FlowExecution` through `FlowPlanner`).

Each `FlowExecution` will be executed in topological order based on the sorted `DataflowGraph`, and we parallelize the execution as much as possible. Depending on the type of error, failed flows may be retried as part of execution.

## Main components of this PR:

- **Flow execution** represents the execution of an individual flow in the dataflow graph. Relevant classes:
  - `FlowExecution`
  - `StreamingFlowExecution`
  - `BatchFlowExecution`
  - `FlowPlanner` – constructs `FlowExecution`s from `Flow` objects
- **Graph execution** represents the execution of an entire dataflow graph, i.e. step 3 in the set of steps above. In the future, we will add a `ContinuousGraphExecution` class, which executes all the streams at once instead of in topological order. Relevant classes:
  - `GraphExecution`
  - `TriggeredGraphExecution` – executes flows in topological order, handles retries when necessary
  - `BackoffStrategy` – used for retries
  - `UncaughtExceptionHandler`
  - `PipelineConf` – a few configurations that control graph execution behavior
- **Pipeline execution** represents a full "run" including all three execution steps above: graph resolution, catalog materialization, and graph execution. Relevant classes:
  - `PipelineExecution`
  - `RunTerminationReason`
  - `PipelineUpdateContext` – represents the parameters to a pipeline execution
  - `PipelineUpdateContextImpl`
- **Catalog materialization** step 2 in the execution steps described above – represents datasets in the dataflow graph in the catalog. Uses DSv2 APIs.
  - `DatasetManager`
- **Graph filtration / selection** allows selecting just a subset of the graph to be executed. In a followup, we will add the plumbing that allows specifying this from the CLI. Relevant classes:
  - `GraphFilter`
- **Events** track the progress of a pipeline execution. The event messages are sent to the client for console logging, and the structured events are available for assertions inside tests. Eventually, these could power info in the Spark UI as well. Relevant classes:
  - `FlowProgressEventLogger`
  - `PipelineRunEventBuffer`
  - `StreamListener`
  - `ConstructPipelineEvent`

### Why are the changes needed?

This PR implemented the core functionality to executing a Declarative Pipeline

### Does this PR introduce _any_ user-facing change?

It introduces new behavior, but does not modify existing behavior.

### How was this patch tested?

New unit test suite:
- `TriggeredGraphExecutionSuite`: tests end-to-end executions of the pipeline under different scenarios (happy path, failure path, etc) and validate proper data has been written and proper event log is emitted.
- `MaterializeTablesSuite`: tests the logic for registering datasets in the catalog.

Augment existing test suites:
- `ConstructPipelineEventSuite` and `PipelineEventSuite` to validate the new FlowProgress event log we're introducing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #51050 from SCHJonathan/graph-execution.

Lead-authored-by: Yuheng Chang <jonathanyuheng@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
… resolution

### What changes were proposed in this pull request?

This PR introduces the `DataflowGraph`, a container for Declarative Pipelines datasets and flows, as described in the [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig). It also adds functionality for
- Constructing a graph by registering a set of graph elements in succession (`GraphRegistrationContext`)
- "Resolving" a graph, which means resolving each of the flows within a graph. Resolving a flow means:
  - Validating that its plan can be successfully analyzed
  - Determining the schema of the data it will produce
  - Determining what upstream datasets within the graph it depends on

It also introduces various secondary changes:
* Changes to `SparkBuild` to support declarative pipelines.
* Updates to the `pom.xml` for the module.
* New error conditions

### Why are the changes needed?

In order to implement Declarative Pipelines.

### Does this PR introduce _any_ user-facing change?

No changes to existing behavior.

### How was this patch tested?
New test suites:
- `ConnectValidPipelineSuite` – test cases where the graph can be successfully resolved
- `ConnectInvalidPipelineSuite` – test cases where the graph fails to be resolved

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#51003 from aakash-db/graph-resolution.

Lead-authored-by: Aakash Japi <aakash.japi@databricks.com>
Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com>
Co-authored-by: Sandy Ryza <sandyryza@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
… resolution

### What changes were proposed in this pull request?

This PR introduces the `DataflowGraph`, a container for Declarative Pipelines datasets and flows, as described in the [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig). It also adds functionality for
- Constructing a graph by registering a set of graph elements in succession (`GraphRegistrationContext`)
- "Resolving" a graph, which means resolving each of the flows within a graph. Resolving a flow means:
  - Validating that its plan can be successfully analyzed
  - Determining the schema of the data it will produce
  - Determining what upstream datasets within the graph it depends on

It also introduces various secondary changes:
* Changes to `SparkBuild` to support declarative pipelines.
* Updates to the `pom.xml` for the module.
* New error conditions

### Why are the changes needed?

In order to implement Declarative Pipelines.

### Does this PR introduce _any_ user-facing change?

No changes to existing behavior.

### How was this patch tested?
New test suites:
- `ConnectValidPipelineSuite` – test cases where the graph can be successfully resolved
- `ConnectInvalidPipelineSuite` – test cases where the graph fails to be resolved

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#51003 from aakash-db/graph-resolution.

Lead-authored-by: Aakash Japi <aakash.japi@databricks.com>
Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com>
Co-authored-by: Sandy Ryza <sandyryza@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
… event logging

### What changes were proposed in this pull request?

**See the flow chart describing the changes made in this PR: [flow chart link](https://lucid.app/lucidchart/c773b051-c634-4f0e-9a3c-a21e24ae540a/edit?viewport_loc=-4594%2C-78%2C5884%2C3280%2C0_0&invitationId=inv_3f036b9d-1a2a-4dd9-bf50-084cd90e5460)**

As described in [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig), after we parse user's code and represent datasets and dataflows in a `DataflowGraph` (from PR apache#51003), we execute the `DataflowGraph`. This PR implements this execution.

## Main execution steps inside a pipeline run

### Step 1: Initialize the raw `DataflowGraph`
In `PipelineExecution::runPipeline()`, we first initialize the dataflow graph by topologically sorting the dependencies and also figuring out the expected metadata (e.g., schema) for each dataset (`DataflowGraph::resolve()`). Also, we run some pre-flight validations to caught some early errors like circular dependencies, create a streaming table with batch data source, etc (`DataflowGraph::validate()`).

### Step 2: Materialize datasets defined in the `DataflowGraph` to the catalog
After the graph is topologically sorted and validated and every dataset / flow has correct metadata populated, we publish the corresponding dataset in the catalog (which could be Hive, UC, or others) in `DatasetManager::materializeDatasets()`. For example, for each Materialized View and Table, it would register a empty table in the catalog with correct metadata (e.g., table schema, table properties, etc). If the table already exists, we alter it to have the correct metadata.

### Step 3: Populate data to the registered tables by executing the `DataflowGraph`
After datasets have been registered to the catalog, inside `TriggeredGraphExecution`, we transform each dataflow defined in the `DataflowGraph` into an actual execution plan to run the actual workload and populate the data to the empty table (we transform `Flow` into `FlowExecution` through `FlowPlanner`).

Each `FlowExecution` will be executed in topological order based on the sorted `DataflowGraph`, and we parallelize the execution as much as possible. Depending on the type of error, failed flows may be retried as part of execution.

## Main components of this PR:

- **Flow execution** represents the execution of an individual flow in the dataflow graph. Relevant classes:
  - `FlowExecution`
  - `StreamingFlowExecution`
  - `BatchFlowExecution`
  - `FlowPlanner` – constructs `FlowExecution`s from `Flow` objects
- **Graph execution** represents the execution of an entire dataflow graph, i.e. step 3 in the set of steps above. In the future, we will add a `ContinuousGraphExecution` class, which executes all the streams at once instead of in topological order. Relevant classes:
  - `GraphExecution`
  - `TriggeredGraphExecution` – executes flows in topological order, handles retries when necessary
  - `BackoffStrategy` – used for retries
  - `UncaughtExceptionHandler`
  - `PipelineConf` – a few configurations that control graph execution behavior
- **Pipeline execution** represents a full "run" including all three execution steps above: graph resolution, catalog materialization, and graph execution. Relevant classes:
  - `PipelineExecution`
  - `RunTerminationReason`
  - `PipelineUpdateContext` – represents the parameters to a pipeline execution
  - `PipelineUpdateContextImpl`
- **Catalog materialization** step 2 in the execution steps described above – represents datasets in the dataflow graph in the catalog. Uses DSv2 APIs.
  - `DatasetManager`
- **Graph filtration / selection** allows selecting just a subset of the graph to be executed. In a followup, we will add the plumbing that allows specifying this from the CLI. Relevant classes:
  - `GraphFilter`
- **Events** track the progress of a pipeline execution. The event messages are sent to the client for console logging, and the structured events are available for assertions inside tests. Eventually, these could power info in the Spark UI as well. Relevant classes:
  - `FlowProgressEventLogger`
  - `PipelineRunEventBuffer`
  - `StreamListener`
  - `ConstructPipelineEvent`

### Why are the changes needed?

This PR implemented the core functionality to executing a Declarative Pipeline

### Does this PR introduce _any_ user-facing change?

It introduces new behavior, but does not modify existing behavior.

### How was this patch tested?

New unit test suite:
- `TriggeredGraphExecutionSuite`: tests end-to-end executions of the pipeline under different scenarios (happy path, failure path, etc) and validate proper data has been written and proper event log is emitted.
- `MaterializeTablesSuite`: tests the logic for registering datasets in the catalog.

Augment existing test suites:
- `ConstructPipelineEventSuite` and `PipelineEventSuite` to validate the new FlowProgress event log we're introducing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#51050 from SCHJonathan/graph-execution.

Lead-authored-by: Yuheng Chang <jonathanyuheng@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
/**
* Helper method to verify unresolved column error message. We expect three elements to be present
* in the message: error class, unresolved column name, list of suggested columns. There are three
* significant differences between different versions of DBR:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aakash-db Here are some descriptions related to DBR, but since the current project is Apache Spark, could you revise these descriptions accordingly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang thanks for catching this – we'll fix ASAP!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param properties Properties of the view
* @param comment when defining a view
*/
case class PersistedView(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the same thing with the well-known "materialized view"?

Copy link
Member

@pan3793 pan3793 Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I figured it out after reading the code, it represents a View registered in the catalog.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants