Skip to content

Commit f77e70d

Browse files
aakash-dbsryza
authored andcommitted
[SPARK-52223][CONNECT] Add SDP Spark Connect Protos
### What changes were proposed in this pull request? Adds the Spark Connect API for Spark Declarative Pipelines: https://issues.apache.org/jira/browse/SPARK-51727. This adds the following protos: 1. `CreateDataflowGraph` creates a new graph in the registry. 2. `DefineDataset` and `DefineFlow` register elements to the created graph. Datasets are the nodes of the dataflow graph, and are either tables or views, and flows are the edges connecting them. 3. `StartRun` starts a run, which is a single execution of a graph. 4. `StopRun` stops an existing run, while `DropPipeline` stops any current runs and drops the pipeline. It also adds the new `PipelineCommand` object to the `ExecutePlanRequest` and the `PipelineCommand.Response` to the `ExecutePlanResponse` object. ### Why are the changes needed? Base API of Spark Declarative Pipelines. Implementation coming in future PRs. ### Does this PR introduce _any_ user-facing change? Yes - creates new proto API within Spark Connect. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50942 from aakash-db/pipeline-spark-connect-api. Lead-authored-by: Aakash Japi <aakash.japi@databricks.com> Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Sandy Ryza <sandyryza@gmail.com>
1 parent 9b24d4c commit f77e70d

File tree

9 files changed

+1447
-284
lines changed

9 files changed

+1447
-284
lines changed

python/pyspark/sql/connect/proto/base_pb2.py

Lines changed: 180 additions & 179 deletions
Large diffs are not rendered by default.

python/pyspark/sql/connect/proto/base_pb2.pyi

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import pyspark.sql.connect.proto.commands_pb2
4444
import pyspark.sql.connect.proto.common_pb2
4545
import pyspark.sql.connect.proto.expressions_pb2
4646
import pyspark.sql.connect.proto.ml_pb2
47+
import pyspark.sql.connect.proto.pipelines_pb2
4748
import pyspark.sql.connect.proto.relations_pb2
4849
import pyspark.sql.connect.proto.types_pb2
4950
import sys
@@ -1583,6 +1584,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
15831584
EXECUTION_PROGRESS_FIELD_NUMBER: builtins.int
15841585
CHECKPOINT_COMMAND_RESULT_FIELD_NUMBER: builtins.int
15851586
ML_COMMAND_RESULT_FIELD_NUMBER: builtins.int
1587+
PIPELINE_EVENT_RESULT_FIELD_NUMBER: builtins.int
1588+
PIPELINE_COMMAND_RESULT_FIELD_NUMBER: builtins.int
15861589
EXTENSION_FIELD_NUMBER: builtins.int
15871590
METRICS_FIELD_NUMBER: builtins.int
15881591
OBSERVED_METRICS_FIELD_NUMBER: builtins.int
@@ -1650,6 +1653,14 @@ class ExecutePlanResponse(google.protobuf.message.Message):
16501653
def ml_command_result(self) -> pyspark.sql.connect.proto.ml_pb2.MlCommandResult:
16511654
"""ML command response"""
16521655
@property
1656+
def pipeline_event_result(self) -> pyspark.sql.connect.proto.pipelines_pb2.PipelineEventResult:
1657+
"""Response containing pipeline event that is streamed back to the client during a pipeline run"""
1658+
@property
1659+
def pipeline_command_result(
1660+
self,
1661+
) -> pyspark.sql.connect.proto.pipelines_pb2.PipelineCommandResult:
1662+
"""Pipeline command response"""
1663+
@property
16531664
def extension(self) -> google.protobuf.any_pb2.Any:
16541665
"""Support arbitrary result objects."""
16551666
@property
@@ -1692,6 +1703,10 @@ class ExecutePlanResponse(google.protobuf.message.Message):
16921703
execution_progress: global___ExecutePlanResponse.ExecutionProgress | None = ...,
16931704
checkpoint_command_result: global___CheckpointCommandResult | None = ...,
16941705
ml_command_result: pyspark.sql.connect.proto.ml_pb2.MlCommandResult | None = ...,
1706+
pipeline_event_result: pyspark.sql.connect.proto.pipelines_pb2.PipelineEventResult
1707+
| None = ...,
1708+
pipeline_command_result: pyspark.sql.connect.proto.pipelines_pb2.PipelineCommandResult
1709+
| None = ...,
16951710
extension: google.protobuf.any_pb2.Any | None = ...,
16961711
metrics: global___ExecutePlanResponse.Metrics | None = ...,
16971712
observed_metrics: collections.abc.Iterable[global___ExecutePlanResponse.ObservedMetrics]
@@ -1717,6 +1732,10 @@ class ExecutePlanResponse(google.protobuf.message.Message):
17171732
b"metrics",
17181733
"ml_command_result",
17191734
b"ml_command_result",
1735+
"pipeline_command_result",
1736+
b"pipeline_command_result",
1737+
"pipeline_event_result",
1738+
b"pipeline_event_result",
17201739
"response_type",
17211740
b"response_type",
17221741
"result_complete",
@@ -1758,6 +1777,10 @@ class ExecutePlanResponse(google.protobuf.message.Message):
17581777
b"observed_metrics",
17591778
"operation_id",
17601779
b"operation_id",
1780+
"pipeline_command_result",
1781+
b"pipeline_command_result",
1782+
"pipeline_event_result",
1783+
b"pipeline_event_result",
17611784
"response_id",
17621785
b"response_id",
17631786
"response_type",
@@ -1798,6 +1821,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
17981821
"execution_progress",
17991822
"checkpoint_command_result",
18001823
"ml_command_result",
1824+
"pipeline_event_result",
1825+
"pipeline_command_result",
18011826
"extension",
18021827
]
18031828
| None

python/pyspark/sql/connect/proto/commands_pb2.py

Lines changed: 106 additions & 105 deletions
Large diffs are not rendered by default.

python/pyspark/sql/connect/proto/commands_pb2.pyi

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import google.protobuf.message
4343
import pyspark.sql.connect.proto.common_pb2
4444
import pyspark.sql.connect.proto.expressions_pb2
4545
import pyspark.sql.connect.proto.ml_pb2
46+
import pyspark.sql.connect.proto.pipelines_pb2
4647
import pyspark.sql.connect.proto.relations_pb2
4748
import sys
4849
import typing
@@ -107,6 +108,7 @@ class Command(google.protobuf.message.Message):
107108
MERGE_INTO_TABLE_COMMAND_FIELD_NUMBER: builtins.int
108109
ML_COMMAND_FIELD_NUMBER: builtins.int
109110
EXECUTE_EXTERNAL_COMMAND_FIELD_NUMBER: builtins.int
111+
PIPELINE_COMMAND_FIELD_NUMBER: builtins.int
110112
EXTENSION_FIELD_NUMBER: builtins.int
111113
@property
112114
def register_function(
@@ -153,6 +155,8 @@ class Command(google.protobuf.message.Message):
153155
@property
154156
def execute_external_command(self) -> global___ExecuteExternalCommand: ...
155157
@property
158+
def pipeline_command(self) -> pyspark.sql.connect.proto.pipelines_pb2.PipelineCommand: ...
159+
@property
156160
def extension(self) -> google.protobuf.any_pb2.Any:
157161
"""This field is used to mark extensions to the protocol. When plugins generate arbitrary
158162
Commands they can add them here. During the planning the correct resolution is done.
@@ -183,6 +187,7 @@ class Command(google.protobuf.message.Message):
183187
merge_into_table_command: global___MergeIntoTableCommand | None = ...,
184188
ml_command: pyspark.sql.connect.proto.ml_pb2.MlCommand | None = ...,
185189
execute_external_command: global___ExecuteExternalCommand | None = ...,
190+
pipeline_command: pyspark.sql.connect.proto.pipelines_pb2.PipelineCommand | None = ...,
186191
extension: google.protobuf.any_pb2.Any | None = ...,
187192
) -> None: ...
188193
def HasField(
@@ -206,6 +211,8 @@ class Command(google.protobuf.message.Message):
206211
b"merge_into_table_command",
207212
"ml_command",
208213
b"ml_command",
214+
"pipeline_command",
215+
b"pipeline_command",
209216
"register_data_source",
210217
b"register_data_source",
211218
"register_function",
@@ -251,6 +258,8 @@ class Command(google.protobuf.message.Message):
251258
b"merge_into_table_command",
252259
"ml_command",
253260
b"ml_command",
261+
"pipeline_command",
262+
b"pipeline_command",
254263
"register_data_source",
255264
b"register_data_source",
256265
"register_function",
@@ -297,6 +306,7 @@ class Command(google.protobuf.message.Message):
297306
"merge_into_table_command",
298307
"ml_command",
299308
"execute_external_command",
309+
"pipeline_command",
300310
"extension",
301311
]
302312
| None
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
# -*- coding: utf-8 -*-
18+
# Generated by the protocol buffer compiler. DO NOT EDIT!
19+
# NO CHECKED-IN PROTOBUF GENCODE
20+
# source: spark/connect/pipelines.proto
21+
# Protobuf Python Version: 5.28.3
22+
"""Generated protocol buffer code."""
23+
from google.protobuf import descriptor as _descriptor
24+
from google.protobuf import descriptor_pool as _descriptor_pool
25+
from google.protobuf import runtime_version as _runtime_version
26+
from google.protobuf import symbol_database as _symbol_database
27+
from google.protobuf.internal import builder as _builder
28+
29+
_runtime_version.ValidateProtobufRuntimeVersion(
30+
_runtime_version.Domain.PUBLIC, 5, 28, 3, "", "spark/connect/pipelines.proto"
31+
)
32+
# @@protoc_insertion_point(imports)
33+
34+
_sym_db = _symbol_database.Default()
35+
36+
37+
from pyspark.sql.connect.proto import relations_pb2 as spark_dot_connect_dot_relations__pb2
38+
from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__pb2
39+
40+
41+
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
42+
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x8c\x11\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12\x62\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32%.spark.connect.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xbc\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x30\n\x04plan\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x04plan\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x07\n\x05_planB\x07\n\x05_once\x1aQ\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0e\n\x0c\x63ommand_type"\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"k\n\rPipelineEvent\x12!\n\ttimestamp\x18\x01 \x01(\tH\x00R\ttimestamp\x88\x01\x01\x12\x1d\n\x07message\x18\x02 \x01(\tH\x01R\x07message\x88\x01\x01\x42\x0c\n\n_timestampB\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3'
43+
)
44+
45+
_globals = globals()
46+
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
47+
_builder.BuildTopDescriptorsAndMessages(
48+
DESCRIPTOR, "pyspark.sql.connect.proto.pipelines_pb2", _globals
49+
)
50+
if not _descriptor._USE_C_DESCRIPTORS:
51+
_globals["DESCRIPTOR"]._loaded_options = None
52+
_globals["DESCRIPTOR"]._serialized_options = b"\n\036org.apache.spark.connect.protoP\001"
53+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._loaded_options = None
54+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_options = b"8\001"
55+
_globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._loaded_options = None
56+
_globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001"
57+
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
58+
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001"
59+
_globals["_DATASETTYPE"]._serialized_start = 2956
60+
_globals["_DATASETTYPE"]._serialized_end = 3053
61+
_globals["_PIPELINECOMMAND"]._serialized_start = 107
62+
_globals["_PIPELINECOMMAND"]._serialized_end = 2295
63+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 670
64+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1061
65+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 879
66+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 937
67+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_RESPONSE"]._serialized_start = 939
68+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_RESPONSE"]._serialized_end = 1020
69+
_globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1063
70+
_globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1153
71+
_globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_start = 1156
72+
_globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 1749
73+
_globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1593
74+
_globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 1659
75+
_globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 1752
76+
_globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2196
77+
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 879
78+
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 937
79+
_globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2198
80+
_globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2279
81+
_globals["_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2298
82+
_globals["_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2497
83+
_globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2500
84+
_globals["_PIPELINECOMMANDRESULT"]._serialized_end = 2770
85+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2657
86+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 2755
87+
_globals["_PIPELINEEVENTRESULT"]._serialized_start = 2772
88+
_globals["_PIPELINEEVENTRESULT"]._serialized_end = 2845
89+
_globals["_PIPELINEEVENT"]._serialized_start = 2847
90+
_globals["_PIPELINEEVENT"]._serialized_end = 2954
91+
# @@protoc_insertion_point(module_scope)

0 commit comments

Comments
 (0)