Skip to content

Commit b196c24

Browse files
SCHJonathangengliangwangsryza
committed
[SPARK-52346][SQL] Declarative Pipeline DataflowGraph execution and event logging
### What changes were proposed in this pull request? **See the flow chart describing the changes made in this PR: [flow chart link](https://lucid.app/lucidchart/c773b051-c634-4f0e-9a3c-a21e24ae540a/edit?viewport_loc=-4594%2C-78%2C5884%2C3280%2C0_0&invitationId=inv_3f036b9d-1a2a-4dd9-bf50-084cd90e5460)** As described in [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig), after we parse user's code and represent datasets and dataflows in a `DataflowGraph` (from PR #51003), we execute the `DataflowGraph`. This PR implements this execution. ## Main execution steps inside a pipeline run ### Step 1: Initialize the raw `DataflowGraph` In `PipelineExecution::runPipeline()`, we first initialize the dataflow graph by topologically sorting the dependencies and also figuring out the expected metadata (e.g., schema) for each dataset (`DataflowGraph::resolve()`). Also, we run some pre-flight validations to caught some early errors like circular dependencies, create a streaming table with batch data source, etc (`DataflowGraph::validate()`). ### Step 2: Materialize datasets defined in the `DataflowGraph` to the catalog After the graph is topologically sorted and validated and every dataset / flow has correct metadata populated, we publish the corresponding dataset in the catalog (which could be Hive, UC, or others) in `DatasetManager::materializeDatasets()`. For example, for each Materialized View and Table, it would register a empty table in the catalog with correct metadata (e.g., table schema, table properties, etc). If the table already exists, we alter it to have the correct metadata. ### Step 3: Populate data to the registered tables by executing the `DataflowGraph` After datasets have been registered to the catalog, inside `TriggeredGraphExecution`, we transform each dataflow defined in the `DataflowGraph` into an actual execution plan to run the actual workload and populate the data to the empty table (we transform `Flow` into `FlowExecution` through `FlowPlanner`). Each `FlowExecution` will be executed in topological order based on the sorted `DataflowGraph`, and we parallelize the execution as much as possible. Depending on the type of error, failed flows may be retried as part of execution. ## Main components of this PR: - **Flow execution** represents the execution of an individual flow in the dataflow graph. Relevant classes: - `FlowExecution` - `StreamingFlowExecution` - `BatchFlowExecution` - `FlowPlanner` – constructs `FlowExecution`s from `Flow` objects - **Graph execution** represents the execution of an entire dataflow graph, i.e. step 3 in the set of steps above. In the future, we will add a `ContinuousGraphExecution` class, which executes all the streams at once instead of in topological order. Relevant classes: - `GraphExecution` - `TriggeredGraphExecution` – executes flows in topological order, handles retries when necessary - `BackoffStrategy` – used for retries - `UncaughtExceptionHandler` - `PipelineConf` – a few configurations that control graph execution behavior - **Pipeline execution** represents a full "run" including all three execution steps above: graph resolution, catalog materialization, and graph execution. Relevant classes: - `PipelineExecution` - `RunTerminationReason` - `PipelineUpdateContext` – represents the parameters to a pipeline execution - `PipelineUpdateContextImpl` - **Catalog materialization** step 2 in the execution steps described above – represents datasets in the dataflow graph in the catalog. Uses DSv2 APIs. - `DatasetManager` - **Graph filtration / selection** allows selecting just a subset of the graph to be executed. In a followup, we will add the plumbing that allows specifying this from the CLI. Relevant classes: - `GraphFilter` - **Events** track the progress of a pipeline execution. The event messages are sent to the client for console logging, and the structured events are available for assertions inside tests. Eventually, these could power info in the Spark UI as well. Relevant classes: - `FlowProgressEventLogger` - `PipelineRunEventBuffer` - `StreamListener` - `ConstructPipelineEvent` ### Why are the changes needed? This PR implemented the core functionality to executing a Declarative Pipeline ### Does this PR introduce _any_ user-facing change? It introduces new behavior, but does not modify existing behavior. ### How was this patch tested? New unit test suite: - `TriggeredGraphExecutionSuite`: tests end-to-end executions of the pipeline under different scenarios (happy path, failure path, etc) and validate proper data has been written and proper event log is emitted. - `MaterializeTablesSuite`: tests the logic for registering datasets in the catalog. Augment existing test suites: - `ConstructPipelineEventSuite` and `PipelineEventSuite` to validate the new FlowProgress event log we're introducing. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51050 from SCHJonathan/graph-execution. Lead-authored-by: Yuheng Chang <jonathanyuheng@gmail.com> Co-authored-by: Gengliang Wang <gengliang@apache.org> Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent ced7b03 commit b196c24

36 files changed

+4963
-32
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,13 @@
498498
},
499499
"sqlState" : "0A000"
500500
},
501+
"CANNOT_UPDATE_PARTITION_COLUMNS" : {
502+
"message" : [
503+
"Declared partitioning <requestedPartitionColumns> conflicts with existing table partitioning <existingPartitionColumns>.",
504+
"Please delete the table or change the declared partitioning to match its partitions."
505+
],
506+
"sqlState" : "42000"
507+
},
501508
"CANNOT_UP_CAST_DATATYPE" : {
502509
"message" : [
503510
"Cannot up cast <expression> from <sourceType> to <targetType>.",

common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ private[spark] object LogKeys {
291291
case object FINAL_PATH extends LogKey
292292
case object FINISH_TIME extends LogKey
293293
case object FINISH_TRIGGER_DURATION extends LogKey
294+
case object FLOW_NAME extends LogKey
294295
case object FREE_MEMORY_SIZE extends LogKey
295296
case object FROM_OFFSET extends LogKey
296297
case object FROM_TIME extends LogKey

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5885,6 +5885,70 @@ object SQLConf {
58855885
.booleanConf
58865886
.createWithDefault(true)
58875887

5888+
val PIPELINES_STREAM_STATE_POLLING_INTERVAL = {
5889+
buildConf("spark.sql.pipelines.execution.streamstate.pollingInterval")
5890+
.doc(
5891+
"Interval in seconds at which the stream state is polled for changes. This is used to " +
5892+
"check if the stream has failed and needs to be restarted."
5893+
)
5894+
.version("4.1.0")
5895+
.timeConf(TimeUnit.SECONDS)
5896+
.createWithDefault(1)
5897+
}
5898+
5899+
val PIPELINES_WATCHDOG_MIN_RETRY_TIME_IN_SECONDS = {
5900+
buildConf("spark.sql.pipelines.execution.watchdog.minRetryTime")
5901+
.doc(
5902+
"Initial duration in seconds between the time when we notice a flow has failed and " +
5903+
"when we try to restart the flow. The interval between flow restarts doubles with " +
5904+
"every stream failure up to the maximum value set in " +
5905+
"`pipelines.execution.watchdog.maxRetryTime`."
5906+
)
5907+
.version("4.1.0")
5908+
.timeConf(TimeUnit.SECONDS)
5909+
.checkValue(v => v > 0, "Watchdog minimum retry time must be at least 1 second.")
5910+
.createWithDefault(5)
5911+
}
5912+
5913+
val PIPELINES_WATCHDOG_MAX_RETRY_TIME_IN_SECONDS = {
5914+
buildConf("spark.sql.pipelines.execution.watchdog.maxRetryTime")
5915+
.doc(
5916+
"Maximum time interval in seconds at which flows will be restarted."
5917+
)
5918+
.version("4.1.0")
5919+
.timeConf(TimeUnit.SECONDS)
5920+
.createWithDefault(3600)
5921+
}
5922+
5923+
val PIPELINES_MAX_CONCURRENT_FLOWS = {
5924+
buildConf("spark.sql.pipelines.execution.maxConcurrentFlows")
5925+
.doc(
5926+
"Max number of flows to execute at once. Used to tune performance for triggered " +
5927+
"pipelines. Has no effect on continuous pipelines."
5928+
)
5929+
.version("4.1.0")
5930+
.intConf
5931+
.createWithDefault(16)
5932+
}
5933+
5934+
5935+
val PIPELINES_TIMEOUT_MS_FOR_TERMINATION_JOIN_AND_LOCK = {
5936+
buildConf("spark.sql.pipelines.timeoutMsForTerminationJoinAndLock")
5937+
.doc("Timeout in milliseconds to grab a lock for stopping update - default is 1hr.")
5938+
.version("4.1.0")
5939+
.timeConf(TimeUnit.MILLISECONDS)
5940+
.checkValue(v => v > 0L, "Timeout for lock must be at least 1 millisecond.")
5941+
.createWithDefault(60 * 60 * 1000)
5942+
}
5943+
5944+
val PIPELINES_MAX_FLOW_RETRY_ATTEMPTS = {
5945+
buildConf("spark.sql.pipelines.maxFlowRetryAttempts")
5946+
.doc("Maximum number of times a flow can be retried")
5947+
.version("4.1.0")
5948+
.intConf
5949+
.createWithDefault(2)
5950+
}
5951+
58885952
/**
58895953
* Holds information about keys that have been deprecated.
58905954
*

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/common/GraphStates.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,22 @@ object FlowStatus {
4343
case object IDLE extends FlowStatus
4444
}
4545

46+
sealed trait RunState
47+
48+
object RunState {
49+
// Run is currently executing queries.
50+
case object RUNNING extends RunState
51+
52+
// Run is complete and all necessary resources are cleaned up.
53+
case object COMPLETED extends RunState
54+
55+
// Run has run into an error that could not be recovered from.
56+
case object FAILED extends RunState
57+
58+
// Run was canceled.
59+
case object CANCELED extends RunState
60+
}
61+
4662
// The type of the dataset.
4763
sealed trait DatasetType
4864
object DatasetType {

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
199199
* streaming tables without a query; such tables should still have at least one flow
200200
* writing to it.
201201
*/
202-
def validateEveryDatasetHasFlow(): Unit = {
202+
private def validateEveryDatasetHasFlow(): Unit = {
203203
(tables.map(_.identifier) ++ views.map(_.identifier)).foreach { identifier =>
204204
if (!flows.exists(_.destinationIdentifier == identifier)) {
205205
throw new AnalysisException(

0 commit comments

Comments
 (0)