-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52283][SQL] Declarative Pipelines DataflowGraph
creation and resolution
#51003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
DataflowGraph
creation and resolutionDataflowGraph
creation and resolution
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphIdentifierManager.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphIdentifierManager.scala
Outdated
Show resolved
Hide resolved
...pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
Outdated
Show resolved
Hide resolved
...ipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
Outdated
Show resolved
Hide resolved
...elines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing some comments
@@ -2025,6 +2031,18 @@ | |||
], | |||
"sqlState" : "42613" | |||
}, | |||
"INCOMPATIBLE_BATCH_VIEW_READ": { | |||
"message": [ | |||
"View <datasetIdentifier> is not a streaming view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this conf and do we really need it?
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/AnalysisWarning.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/Language.scala
Show resolved
Hide resolved
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Outdated
Show resolved
Hide resolved
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Outdated
Show resolved
Hide resolved
* @param upstreamNodes Upstream nodes for the node | ||
* @return | ||
*/ | ||
def processNode(node: GraphElement, upstreamNodes: Seq[GraphElement]): Seq[GraphElement] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Document return. I'm especially curious why this is a Seq and when processNode would return more than one element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, it's mostly just for flexibility - in case one node maps to several in the future.
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Show resolved
Hide resolved
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Outdated
Show resolved
Hide resolved
DataflowGraph
creation and resolutionDataflowGraph
creation and resolution
DataflowGraph
creation and resolutionDataflowGraph
creation and resolution
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/AnalysisWarning.scala
Show resolved
Hide resolved
val materializedFlowIdentifiers: Set[TableIdentifier] = materializedFlows.map(_.identifier).toSet | ||
|
||
/** Returns a [[Table]] given its identifier */ | ||
lazy val table: Map[TableIdentifier, Table] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableIdentifier
only supports 3-level namespace. Shall we use Seq[String]
to better support DS v2, which can have an arbitrary level of namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seq[String]
is a bit hard to use here. We can switch to the DS v2 API after we create an encapsulation class to match TableIdentifier
.
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Outdated
Show resolved
Hide resolved
* construct it. | ||
* @param batchInputs the complete inputs read by the flow | ||
* @param sreamingInputs the incremental inputs read by the flow | ||
* @param usedExternalInputs the identifiers of the external inputs read by the flow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put some examples? I don't quite understand what it is given it's just a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, not sure what exactly you mean by examples. An input is just a string name of a table, and here, this just refers to tables (or views, etc) outside of this pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like usedExternalInputs
actually isn't used anywhere. Can we just take it out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's for tables/views, why not use TableIdentifier
instead of String?
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala
Show resolved
Hide resolved
|
||
/** | ||
* Normalized storage location used for storing materializations for this [[Output]]. | ||
* If [[None]], it means this [[Output]] has not been normalized yet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the output must be a file source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't have to be a file, but we expect all tables to have a file path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JDBC table doesn't have a file path. Maybe we should be clear that the pipeline sink table must be file source based.
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
Outdated
Show resolved
Hide resolved
@@ -1949,6 +1955,12 @@ | |||
], | |||
"sqlState" : "42818" | |||
}, | |||
"INCOMPATIBLE_BATCH_VIEW_READ" : { | |||
"message" : [ | |||
"View <datasetIdentifier> is not a batch view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"View <datasetIdentifier> is not a batch view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false." | |
"View <datasetIdentifier> is a batch view and must be referenced using `SparkSession#read`. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false." |
@@ -2025,6 +2037,12 @@ | |||
], | |||
"sqlState" : "42613" | |||
}, | |||
"INCOMPATIBLE_STREAMING_VIEW_READ" : { | |||
"message" : [ | |||
"View <datasetIdentifier> is a streaming view and must be referenced using readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"View <datasetIdentifier> is a streaming view and must be referenced using readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false." | |
"View <datasetIdentifier> is a streaming view and must be referenced using `SparkSession#readStream`. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false." |
val resolvedFlow = flowResolver.attemptResolveFlow( | ||
flow, | ||
rawGraph.inputIdentifiers, | ||
resolvedInputs.asScala.toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we let attemptResolveFlow
to take java hash map so that we can save this scala -> java conversion?
isStreamingTableOpt = Option(resolvedFlowsToTable.exists(f => f.df.isStreaming)) | ||
) | ||
|
||
// Table will be virtual in either of the following scenarios: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we always create VirtualTableInput
, I don't quite understand the conditions here.
f.inputs.toSeq | ||
.map(availableResolvedInputs(_)) | ||
.filter { | ||
// Input is a flow implies that the upstream table is a View. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it hard to understand this comment. We are resolving a flow, and the input of a flow can be other flows? Why it means the upstream table is view?
What changes were proposed in this pull request?
This PR introduces the
DataflowGraph
, a container for Declarative Pipelines datasets and flows, as described in the Declarative Pipelines SPIP. It also adds functionality forGraphRegistrationContext
)It also introduces various secondary changes:
SparkBuild
to support declarative pipelines.pom.xml
for the module.Why are the changes needed?
In order to implement Declarative Pipelines.
Does this PR introduce any user-facing change?
No changes to existing behavior.
How was this patch tested?
New test suites:
ConnectValidPipelineSuite
– test cases where the graph can be successfully resolvedConnectInvalidPipelineSuite
– test cases where the graph fails to be resolvedWas this patch authored or co-authored using generative AI tooling?
No