Skip to content

[SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution #51003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

aakash-db
Copy link
Contributor

@aakash-db aakash-db commented May 23, 2025

What changes were proposed in this pull request?

This PR introduces the DataflowGraph, a container for Declarative Pipelines datasets and flows, as described in the Declarative Pipelines SPIP. It also adds functionality for

  • Constructing a graph by registering a set of graph elements in succession (GraphRegistrationContext)
  • "Resolving" a graph, which means resolving each of the flows within a graph. Resolving a flow means:
    • Validating that its plan can be successfully analyzed
    • Determining the schema of the data it will produce
    • Determining what upstream datasets within the graph it depends on

It also introduces various secondary changes:

  • Changes to SparkBuild to support declarative pipelines.
  • Updates to the pom.xml for the module.
  • New error conditions

Why are the changes needed?

In order to implement Declarative Pipelines.

Does this PR introduce any user-facing change?

No changes to existing behavior.

How was this patch tested?

New test suites:

  • ConnectValidPipelineSuite – test cases where the graph can be successfully resolved
  • ConnectInvalidPipelineSuite – test cases where the graph fails to be resolved

Was this patch authored or co-authored using generative AI tooling?

No

@sryza sryza changed the title [SPARK-52283][CONNECT] SDP DataflowGraph creation and resolution [SPARK-52283][CONNECT] Declarative Pipelines DataflowGraph creation and resolution May 23, 2025
@sryza sryza self-requested a review May 23, 2025 21:01
@sryza sryza self-assigned this May 23, 2025
Copy link

@jonmio jonmio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing some comments

@@ -2025,6 +2031,18 @@
],
"sqlState" : "42613"
},
"INCOMPATIBLE_BATCH_VIEW_READ": {
"message": [
"View <datasetIdentifier> is not a streaming view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this conf and do we really need it?

* @param upstreamNodes Upstream nodes for the node
* @return
*/
def processNode(node: GraphElement, upstreamNodes: Seq[GraphElement]): Seq[GraphElement] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Document return. I'm especially curious why this is a Seq and when processNode would return more than one element

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, it's mostly just for flexibility - in case one node maps to several in the future.

@apache apache deleted a comment from aakash-db May 27, 2025
@aakash-db aakash-db requested a review from sryza May 27, 2025 22:40
@aakash-db aakash-db changed the title [SPARK-52283][CONNECT] Declarative Pipelines DataflowGraph creation and resolution [SPARK-52283] Declarative Pipelines DataflowGraph creation and resolution May 27, 2025
@aakash-db aakash-db changed the title [SPARK-52283] Declarative Pipelines DataflowGraph creation and resolution [SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution May 27, 2025
@sryza sryza requested a review from cloud-fan May 27, 2025 22:53
val materializedFlowIdentifiers: Set[TableIdentifier] = materializedFlows.map(_.identifier).toSet

/** Returns a [[Table]] given its identifier */
lazy val table: Map[TableIdentifier, Table] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableIdentifier only supports 3-level namespace. Shall we use Seq[String] to better support DS v2, which can have an arbitrary level of namespace?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq[String] is a bit hard to use here. We can switch to the DS v2 API after we create an encapsulation class to match TableIdentifier.

@aakash-db aakash-db requested a review from jonmio May 28, 2025 05:52
* construct it.
* @param batchInputs the complete inputs read by the flow
* @param sreamingInputs the incremental inputs read by the flow
* @param usedExternalInputs the identifiers of the external inputs read by the flow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put some examples? I don't quite understand what it is given it's just a string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, not sure what exactly you mean by examples. An input is just a string name of a table, and here, this just refers to tables (or views, etc) outside of this pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like usedExternalInputs actually isn't used anywhere. Can we just take it out?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's for tables/views, why not use TableIdentifier instead of String?


/**
* Normalized storage location used for storing materializations for this [[Output]].
* If [[None]], it means this [[Output]] has not been normalized yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output must be a file source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be a file, but we expect all tables to have a file path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC table doesn't have a file path. Maybe we should be clear that the pipeline sink table must be file source based.

@@ -1949,6 +1955,12 @@
],
"sqlState" : "42818"
},
"INCOMPATIBLE_BATCH_VIEW_READ" : {
"message" : [
"View <datasetIdentifier> is not a batch view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
Copy link
Contributor

@cloud-fan cloud-fan May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"View <datasetIdentifier> is not a batch view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
"View <datasetIdentifier> is a batch view and must be referenced using `SparkSession#read`. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."

@@ -2025,6 +2037,12 @@
],
"sqlState" : "42613"
},
"INCOMPATIBLE_STREAMING_VIEW_READ" : {
"message" : [
"View <datasetIdentifier> is a streaming view and must be referenced using readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"View <datasetIdentifier> is a streaming view and must be referenced using readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
"View <datasetIdentifier> is a streaming view and must be referenced using `SparkSession#readStream`. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."

val resolvedFlow = flowResolver.attemptResolveFlow(
flow,
rawGraph.inputIdentifiers,
resolvedInputs.asScala.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we let attemptResolveFlow to take java hash map so that we can save this scala -> java conversion?

isStreamingTableOpt = Option(resolvedFlowsToTable.exists(f => f.df.isStreaming))
)

// Table will be virtual in either of the following scenarios:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we always create VirtualTableInput, I don't quite understand the conditions here.

f.inputs.toSeq
.map(availableResolvedInputs(_))
.filter {
// Input is a flow implies that the upstream table is a View.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it hard to understand this comment. We are resolving a flow, and the input of a flow can be other flows? Why it means the upstream table is view?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants