Skip to content

Commit f66e290

Browse files
Declarative Pipelines (nearly) complete 😉
1 parent 8b93a9a commit f66e290

15 files changed

+510
-21
lines changed

‎docs/SparkSession.md

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,15 @@ val spark = SparkSession.builder
5252
* `SparkSession.Builder` is requested to [getOrCreate](SparkSession-Builder.md#getOrCreate)
5353
* Indirectly using [newSession](#newSession) or [cloneSession](#cloneSession)
5454

55-
## <span id="sessionState"> SessionState
55+
## StreamingQueryManager { #streams }
56+
57+
```scala
58+
streams: StreamingQueryManager
59+
```
60+
61+
`streams` requests this [SessionState](#sessionState) for the [StreamingQueryManager](SessionState.md#streamingQueryManager)
62+
63+
## SessionState { #sessionState }
5664

5765
```scala
5866
sessionState: SessionState
@@ -65,7 +73,7 @@ Internally, `sessionState` <<SessionState.md#clone, clones>> the optional <<pare
6573
* *in-memory* (default) for SessionStateBuilder.md[org.apache.spark.sql.internal.SessionStateBuilder]
6674
* *hive* for hive/HiveSessionStateBuilder.md[org.apache.spark.sql.hive.HiveSessionStateBuilder]
6775

68-
## <span id="newSession"> Creating New SparkSession
76+
## Creating New SparkSession { #newSession }
6977

7078
```scala
7179
newSession(): SparkSession
@@ -80,7 +88,7 @@ newSession(): SparkSession
8088
!!! note "SparkSession.newSession and SparkSession.cloneSession"
8189
`SparkSession.newSession` uses no parent [SessionState](#parentSessionState) while [SparkSession.cloneSession](#cloneSession) (re)uses [SessionState](#sessionState).
8290

83-
## <span id="cloneSession"> Cloning SparkSession
91+
## Cloning SparkSession { #cloneSession }
8492

8593
```scala
8694
cloneSession(): SparkSession
@@ -119,7 +127,7 @@ version: String
119127

120128
Internally, `version` uses `spark.SPARK_VERSION` value that is the `version` property in `spark-version-info.properties` properties file on CLASSPATH.
121129

122-
## <span id="emptyDataset"> Creating Empty Dataset (Given Encoder)
130+
## Creating Empty Dataset (Given Encoder) { #emptyDataset }
123131

124132
```scala
125133
emptyDataset[T: Encoder]: Dataset[T]
@@ -138,7 +146,7 @@ root
138146

139147
`emptyDataset` creates a [LocalRelation](logical-operators/LocalRelation.md) logical operator.
140148

141-
## <span id="createDataset"> Creating Dataset from Local Collections or RDDs
149+
## Creating Dataset from Local Collections or RDDs { #createDataset }
142150

143151
```scala
144152
createDataset[T : Encoder](
@@ -246,7 +254,7 @@ scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
246254

247255
Internally, it is simply an alias for [SessionState.udfRegistration](SessionState.md#udfRegistration).
248256

249-
## <span id="table"> Loading Data From Table
257+
## Loading Data From Table { #table }
250258

251259
```scala
252260
table(
@@ -282,7 +290,7 @@ catalog: Catalog
282290
??? note "lazy value"
283291
`catalog` is a Scala lazy value which is computed once when accessed and cached afterwards.
284292

285-
## <span id="read"> DataFrameReader
293+
## DataFrameReader { #read }
286294

287295
```scala
288296
read: DataFrameReader
@@ -295,7 +303,7 @@ val spark: SparkSession = ... // create instance
295303
val dfReader: DataFrameReader = spark.read
296304
```
297305

298-
## <span id="conf"> Runtime Configuration
306+
## Runtime Configuration { #conf }
299307

300308
```scala
301309
conf: RuntimeConfig
@@ -305,7 +313,7 @@ conf: RuntimeConfig
305313

306314
Internally, `conf` creates a [RuntimeConfig](RuntimeConfig.md) (when requested the very first time and cached afterwards) with the [SQLConf](SessionState.md#conf) (of the [SessionState](#sessionState)).
307315

308-
## <span id="experimentalMethods"> ExperimentalMethods
316+
## ExperimentalMethods { #experimentalMethods }
309317

310318
```scala
311319
experimental: ExperimentalMethods
@@ -315,7 +323,7 @@ experimental: ExperimentalMethods
315323

316324
`experimental` is used in [SparkPlanner](SparkPlanner.md) and [SparkOptimizer](SparkOptimizer.md).
317325

318-
## <span id="baseRelationToDataFrame"> Create DataFrame for BaseRelation
326+
## Create DataFrame for BaseRelation { #baseRelationToDataFrame }
319327

320328
```scala
321329
baseRelationToDataFrame(
@@ -330,7 +338,7 @@ Internally, `baseRelationToDataFrame` creates a [DataFrame](DataFrame.md) from t
330338
* `TextInputCSVDataSource` creates a base `Dataset` (of Strings)
331339
* `TextInputJsonDataSource` creates a base `Dataset` (of Strings)
332340

333-
## <span id="instantiateSessionState"> Creating SessionState
341+
## Creating SessionState { #instantiateSessionState }
334342

335343
```scala
336344
instantiateSessionState(
@@ -348,7 +356,7 @@ Error while instantiating '[className]'
348356

349357
`instantiateSessionState` is used when `SparkSession` is requested for [SessionState](#sessionState) (based on [spark.sql.catalogImplementation](StaticSQLConf.md#spark.sql.catalogImplementation) configuration property).
350358

351-
## <span id="sessionStateClassName"> sessionStateClassName
359+
## sessionStateClassName { #sessionStateClassName }
352360

353361
```scala
354362
sessionStateClassName(
@@ -362,7 +370,7 @@ sessionStateClassName(
362370

363371
`sessionStateClassName` is used when `SparkSession` is requested for the [SessionState](#sessionState) (and one is not available yet).
364372

365-
## <span id="internalCreateDataFrame"> Creating DataFrame From RDD Of Internal Binary Rows and Schema
373+
## Creating DataFrame From RDD Of Internal Binary Rows and Schema { #internalCreateDataFrame }
366374

367375
```scala
368376
internalCreateDataFrame(
@@ -381,31 +389,31 @@ internalCreateDataFrame(
381389

382390
* [InsertIntoDataSourceCommand](logical-operators/InsertIntoDataSourceCommand.md) logical command is executed
383391

384-
## <span id="listenerManager"> ExecutionListenerManager
392+
## ExecutionListenerManager { #listenerManager }
385393

386394
```scala
387395
listenerManager: ExecutionListenerManager
388396
```
389397

390398
[ExecutionListenerManager](ExecutionListenerManager.md)
391399

392-
## <span id="sharedState"> SharedState
400+
## SharedState { #sharedState }
393401

394402
```scala
395403
sharedState: SharedState
396404
```
397405

398406
[SharedState](SharedState.md)
399407

400-
## <span id="time"> Measuring Duration of Executing Code Block
408+
## Measuring Duration of Executing Code Block { #time }
401409

402410
```scala
403411
time[T](f: => T): T
404412
```
405413

406414
`time` executes a code block and prints out (to standard output) the time taken to execute it
407415

408-
## <span id="applyExtensions"> Applying SparkSessionExtensions
416+
## Applying SparkSessionExtensions { #applyExtensions }
409417

410418
```scala
411419
applyExtensions(
@@ -437,7 +445,7 @@ Cannot use [extensionConfClassName] to configure session extensions.
437445
* `SparkSession.Builder` is requested to [get active or create a new SparkSession instance](SparkSession-Builder.md#getOrCreate)
438446
* `SparkSession` is [created](#creating-instance) (from a `SparkContext`)
439447

440-
## <span id="leafNodeDefaultParallelism"> Default Parallelism of Leaf Nodes
448+
## Default Parallelism of Leaf Nodes { #leafNodeDefaultParallelism }
441449

442450
```scala
443451
leafNodeDefaultParallelism: Int

‎docs/declarative-pipelines/DataflowGraph.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,32 @@ reanalyzeFlow(
2828

2929
* `BatchTableWrite` is requested to [executeInternal](BatchTableWrite.md#executeInternal)
3030
* `StreamingTableWrite` is requested to [startStream](StreamingTableWrite.md#startStream)
31+
32+
## Resolve { #resolve }
33+
34+
```scala
35+
resolve(): DataflowGraph
36+
```
37+
38+
`resolve`...FIXME
39+
40+
---
41+
42+
`resolve` is used when:
43+
44+
* `DataflowGraph` is requested to [reanalyzeFlow](#reanalyzeFlow)
45+
* `PipelineExecution` is requested to [initializeGraph](PipelineExecution.md#initializeGraph)
46+
47+
## Validate { #validate }
48+
49+
```scala
50+
validate(): DataflowGraph
51+
```
52+
53+
`validate`...FIXME
54+
55+
---
56+
57+
`validate` is used when:
58+
59+
* `PipelineExecution` is requested to [initialize the dataflow graph](PipelineExecution.md#initializeGraph)

‎docs/declarative-pipelines/DataflowGraphRegistry.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# DataflowGraphRegistry
22

3+
`DataflowGraphRegistry` is a registry of [Dataflow Graphs](#dataflowGraphs).
4+
35
!!! note "Scala object"
46
`DataflowGraphRegistry` is an `object` in Scala which means it is a class that has exactly one instance (itself).
57
A Scala `object` is created lazily when it is referenced for the first time.
@@ -17,6 +19,14 @@ val graphId = DataflowGraphRegistry.createDataflowGraph(
1719
defaultSqlConf=Map.empty)
1820
```
1921

22+
## Dataflow Graphs { #dataflowGraphs }
23+
24+
```scala
25+
dataflowGraphs: ConcurrentHashMap[String, GraphRegistrationContext]
26+
```
27+
28+
`DataflowGraphRegistry` creates an empty collection of [GraphRegistrationContext](GraphRegistrationContext.md)s by their UUIDs.
29+
2030
## createDataflowGraph { #createDataflowGraph }
2131

2232
```scala
@@ -33,3 +43,37 @@ createDataflowGraph(
3343
`createDataflowGraph` is used when:
3444

3545
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [createDataflowGraph](PipelinesHandler.md#createDataflowGraph)
46+
47+
## Find Dataflow Graph (or Throw SparkException) { #getDataflowGraphOrThrow }
48+
49+
```scala
50+
getDataflowGraphOrThrow(
51+
dataflowGraphId: String): GraphRegistrationContext
52+
```
53+
54+
`getDataflowGraphOrThrow` [looks up the GraphRegistrationContext](#getDataflowGraph) for the given `dataflowGraphId` or throws an `SparkException` if it does not exist.
55+
56+
```text
57+
Dataflow graph with id [graphId] could not be found
58+
```
59+
60+
---
61+
62+
`getDataflowGraphOrThrow` is used when:
63+
64+
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [defineDataset](PipelinesHandler.md#defineDataset), [defineFlow](PipelinesHandler.md#defineFlow), [defineSqlGraphElements](PipelinesHandler.md#defineSqlGraphElements), [startRun](PipelinesHandler.md#startRun)
65+
66+
## Find Dataflow Graph { #getDataflowGraph }
67+
68+
```scala
69+
getDataflowGraph(
70+
graphId: String): Option[GraphRegistrationContext]
71+
```
72+
73+
`getDataflowGraph` finds the [GraphRegistrationContext](GraphRegistrationContext.md) for the given `graphId` (in this [dataflowGraphs](#dataflowGraphs) registry).
74+
75+
---
76+
77+
`getDataflowGraph` is used when:
78+
79+
* `DataflowGraphRegistry` is requested to [getDataflowGraphOrThrow](#getDataflowGraphOrThrow)
Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,52 @@
11
# DatasetManager
22

3-
`DatasetManager` is...FIXME
3+
!!! note "Scala object"
4+
`DatasetManager` is an `object` in Scala which means it is a class that has exactly one instance (itself).
5+
A Scala `object` is created lazily when it is referenced for the first time.
6+
7+
Learn more in [Tour of Scala](https://docs.scala-lang.org/tour/singleton-objects.html).
8+
9+
## materializeDatasets { #materializeDatasets }
10+
11+
```scala
12+
materializeDatasets(
13+
resolvedDataflowGraph: DataflowGraph,
14+
context: PipelineUpdateContext): DataflowGraph
15+
```
16+
17+
`materializeDatasets`...FIXME
18+
19+
---
20+
21+
`materializeDatasets` is used when:
22+
23+
* `PipelineExecution` is requested to [initialize the dataflow graph](PipelineExecution.md#initializeGraph)
24+
25+
## constructFullRefreshSet { #constructFullRefreshSet }
26+
27+
```scala
28+
constructFullRefreshSet(
29+
graphTables: Seq[Table],
30+
context: PipelineUpdateContext): (Seq[Table], Seq[TableIdentifier], Seq[TableIdentifier])
31+
```
32+
33+
`constructFullRefreshSet` gives the following collections:
34+
35+
* [Table](Table.md)s to be refreshed (incl. a full refresh)
36+
* `TableIdentifier`s of the tables to be refreshed (excl. fully refreshed)
37+
* `TableIdentifier`s of the tables to be fully refreshed only
38+
39+
If there are tables to be fully refreshed yet not allowed for a full refresh, `constructFullRefreshSet` prints out the following INFO message to the logs:
40+
41+
```text
42+
Skipping full refresh on some tables because pipelines.reset.allowed was set to false.
43+
Tables: [fullRefreshNotAllowed]
44+
```
45+
46+
`constructFullRefreshSet`...FIXME
47+
48+
---
49+
50+
`constructFullRefreshSet` is used when:
51+
52+
* `PipelineExecution` is requested to [initialize the dataflow graph](PipelineExecution.md#initializeGraph)

‎docs/declarative-pipelines/GraphExecution.md

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,21 @@
22

33
`GraphExecution` is an [abstraction](#contract) of [graph executors](#implementations) that can...FIXME
44

5-
## Contract
5+
## Contract (Subset) { #contract }
6+
7+
### awaitCompletion { #awaitCompletion }
8+
9+
```scala
10+
awaitCompletion(): Unit
11+
```
12+
13+
See:
14+
15+
* [TriggeredGraphExecution](TriggeredGraphExecution.md#awaitCompletion)
16+
17+
Used when:
18+
19+
* `PipelineExecution` is requested to [await completion](PipelineExecution.md#awaitCompletion)
620

721
### streamTrigger { #streamTrigger }
822

@@ -11,6 +25,10 @@ streamTrigger(
1125
flow: Flow): Trigger
1226
```
1327

28+
See:
29+
30+
* [TriggeredGraphExecution](TriggeredGraphExecution.md#streamTrigger)
31+
1432
Used when:
1533

1634
* `GraphExecution` is [created](#creating-instance) (to create the [FlowPlanner](#flowPlanner))
@@ -66,3 +84,26 @@ planAndStartFlow(
6684
`planAndStartFlow` is used when:
6785

6886
* `TriggeredGraphExecution` is requested to [topologicalExecution](TriggeredGraphExecution.md#topologicalExecution)
87+
88+
## StreamListener { #streamListener }
89+
90+
`GraphExecution` creates a new [StreamListener](StreamListener.md) when [created](#creating-instance).
91+
92+
The `StreamListener` is created for this [PipelineUpdateContext](#env) and [DataflowGraph](#graphForExecution).
93+
94+
The `StreamListener` is registered (_added_) to the session-bound [StreamingQueryManager](../SparkSession.md#streams) when [started](#start), and deregistered (_removed_) when [stopped](#stop).
95+
96+
## Stop { #stop }
97+
98+
```scala
99+
stop(): Unit
100+
```
101+
102+
`stop` requests this session-bound [StreamingQueryManager](../SparkSession.md#streams) to remove this [StreamListener](#streamListener).
103+
104+
---
105+
106+
`stop` is used when:
107+
108+
* `PipelineExecution` is requested to [stop the pipeline](PipelineExecution.md#stopPipeline)
109+
* `TriggeredGraphExecution` is requested to [create the Topological Execution thread](TriggeredGraphExecution.md#buildTopologicalExecutionThread) and [stopInternal](TriggeredGraphExecution.md#stopInternal)

0 commit comments

Comments
 (0)