Skip to content

Commit a678f8b

Browse files
[SDP] PipelinesHandler (Spark Connect) and GraphRegistrationContext
1 parent ef04c16 commit a678f8b

File tree

2 files changed

+87
-8
lines changed

2 files changed

+87
-8
lines changed

docs/declarative-pipelines/GraphRegistrationContext.md

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
`GraphRegistrationContext` is a registry of [tables](#tables), [views](#views), and [flows](#flows) in a dataflow graph.
44

5+
`GraphRegistrationContext` is required to create a new [SqlGraphRegistrationContext](SqlGraphRegistrationContext.md).
6+
57
## Creating Instance
68

79
`GraphRegistrationContext` takes the following to be created:
@@ -49,10 +51,31 @@ registerTable(
4951
tableDef: Table): Unit
5052
```
5153

52-
`registerTable` adds the given [Table](Table.md) to [tables](#tables) registry.
54+
`registerTable` adds the given [Table](Table.md) to the [tables](#tables) registry.
5355

5456
---
5557

5658
`registerTable` is used when:
5759

58-
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested is requested to [defineDataset](PipelinesHandler.md#defineDataset)
60+
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [handle DEFINE_DATASET command](PipelinesHandler.md#defineDataset)
61+
62+
## Register Flow { #registerFlow }
63+
64+
```scala
65+
registerFlow(
66+
flowDef: UnresolvedFlow): Unit
67+
```
68+
69+
`registerFlow` adds the given [UnresolvedFlow](UnresolvedFlow.md) to the [flows](#flows) registry.
70+
71+
---
72+
73+
`registerFlow` is used when:
74+
75+
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [handle DEFINE_FLOW command](PipelinesHandler.md#defineFlow)
76+
* `SqlGraphRegistrationContext` is requested to [process the following SQL commands](SqlGraphRegistrationContext.md#processSqlQuery):
77+
* `CreateFlowCommand`
78+
* `CreateMaterializedViewAsSelect`
79+
* `CreateView`
80+
* `CreateStreamingTableAsSelect`
81+
* `CreateViewCommand`

docs/declarative-pipelines/PipelinesHandler.md

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# PipelinesHandler
22

3-
## handlePipelinesCommand { #handlePipelinesCommand }
3+
`PipelinesHandler` is used to [handle pipeline commands](#handlePipelinesCommand) in [Spark Connect]({{ book.spark_connect }}) ([SparkConnectPlanner]({{ book.spark_connect }}/server/SparkConnectPlanner), precisely).
4+
5+
## Handle Pipelines Command { #handlePipelinesCommand }
46

57
```scala
68
handlePipelinesCommand(
@@ -10,14 +12,43 @@ handlePipelinesCommand(
1012
transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult
1113
```
1214

13-
`handlePipelinesCommand`...FIXME
15+
`handlePipelinesCommand` handles the given pipeline `cmd` command.
16+
17+
| PipelineCommand | Description |
18+
|-----------------|-------------|
19+
| `CREATE_DATAFLOW_GRAPH` | [Creates a new Dataflow Graph](#createDataflowGraph) |
20+
| `DROP_DATAFLOW_GRAPH` | [Drops a pipeline](#DROP_DATAFLOW_GRAPH) |
21+
| `DEFINE_DATASET` | [Defines a dataset](#DEFINE_DATASET) |
22+
| `DEFINE_FLOW` | [Defines a flow](#DEFINE_FLOW) |
23+
| `START_RUN` | [START_RUN](#START_RUN) |
24+
| `DEFINE_SQL_GRAPH_ELEMENTS` | [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) |
1425

1526
---
1627

1728
`handlePipelinesCommand` is used when:
1829

1930
* `SparkConnectPlanner` is requested to `handlePipelineCommand` (for `PIPELINE_COMMAND` command)
2031

32+
### Define Dataset Command { #DEFINE_DATASET }
33+
34+
`handlePipelinesCommand` prints out the following INFO message to the logs:
35+
36+
```text
37+
Define pipelines dataset cmd received: [cmd]
38+
```
39+
40+
`handlePipelinesCommand` [defines a dataset](#defineDataset).
41+
42+
### Define Flow Command { #DEFINE_FLOW }
43+
44+
`handlePipelinesCommand` prints out the following INFO message to the logs:
45+
46+
```text
47+
Define pipelines flow cmd received: [cmd]
48+
```
49+
50+
`handlePipelinesCommand` [defines a flow](#defineFlow).
51+
2152
### startRun { #startRun }
2253

2354
```scala
@@ -37,7 +68,9 @@ createDataflowGraph(
3768
spark: SparkSession): String
3869
```
3970

40-
`createDataflowGraph`...FIXME
71+
`createDataflowGraph` finds the catalog and the database in the given `cmd` command and [creates a dataflow graph](DataflowGraphRegistry.md#createDataflowGraph).
72+
73+
`createDataflowGraph` returns the ID of the created dataflow graph.
4174

4275
### defineSqlGraphElements { #defineSqlGraphElements }
4376

@@ -49,15 +82,28 @@ defineSqlGraphElements(
4982

5083
`defineSqlGraphElements`...FIXME
5184

52-
### defineDataset { #defineDataset }
85+
### Define Dataset (Table or View) { #defineDataset }
5386

5487
```scala
5588
defineDataset(
5689
dataset: proto.PipelineCommand.DefineDataset,
5790
sparkSession: SparkSession): Unit
5891
```
5992

60-
`defineDataset`...FIXME
93+
`defineDataset` looks up the [GraphRegistrationContext](DataflowGraphRegistry.md#getDataflowGraphOrThrow) for the given `dataset` (or throws a `SparkException` if not found).
94+
95+
`defineDataset` branches off based on the `dataset` type:
96+
97+
| Dataset Type | Action |
98+
|--------------|--------|
99+
| `MATERIALIZED_VIEW` or `TABLE` | [Registers a table](GraphRegistrationContext.md#registerTable) |
100+
| `TEMPORARY_VIEW` | [Registers a view](GraphRegistrationContext.md#registerView) |
101+
102+
For unknown types, `defineDataset` reports an `IllegalArgumentException`:
103+
104+
```text
105+
Unknown dataset type: [type]
106+
```
61107

62108
### defineFlow { #defineFlow }
63109

@@ -68,4 +114,14 @@ defineFlow(
68114
sparkSession: SparkSession): Unit
69115
```
70116

71-
`defineFlow`...FIXME
117+
`defineFlow` looks up the [GraphRegistrationContext](DataflowGraphRegistry.md#getDataflowGraphOrThrow) for the given `flow` (or throws a `SparkException` if not found).
118+
119+
!!! note "Implicit Flows"
120+
An **implicit flow** is a flow with the name of the target dataset (i.e. one defined as part of dataset creation).
121+
122+
`defineFlow` [creates a flow identifier](GraphIdentifierManager.md#parseTableIdentifier) (for the `flow` name).
123+
124+
??? note "AnalysisException"
125+
`defineFlow` reports an `AnalysisException` if the given `flow` is not an implicit flow, but is defined with a multi-part identifier.
126+
127+
In the end, `defineFlow` [registers a flow](GraphRegistrationContext.md#registerFlow).

0 commit comments

Comments
 (0)