From 57174fc57190eb93c473d428af8af0acfc49cf27 Mon Sep 17 00:00:00 2001 From: anishm-db Date: Tue, 1 Jul 2025 20:19:00 +0000 Subject: [PATCH 1/4] impl --- .../pyspark/pipelines/source_code_location.py | 24 ++ .../spark_connect_graph_element_registry.py | 12 + .../sql/connect/proto/pipelines_pb2.py | 44 +-- .../sql/connect/proto/pipelines_pb2.pyi | 279 ++++++++++++------ .../protobuf/spark/connect/pipelines.proto | 14 + .../connect/pipelines/PipelinesHandler.scala | 12 + .../pipelines/PythonPipelineSuite.scala | 137 ++++++++- .../graph/CoreDataflowNodeProcessor.scala | 6 +- .../spark/sql/pipelines/graph/Flow.scala | 5 +- .../sql/pipelines/graph/FlowAnalysis.scala | 6 +- .../pipelines/graph/FlowAnalysisContext.scala | 3 +- .../graph/GraphRegistrationContext.scala | 73 +++-- 12 files changed, 470 insertions(+), 145 deletions(-) diff --git a/python/pyspark/pipelines/source_code_location.py b/python/pyspark/pipelines/source_code_location.py index 5f23b819abd85..b546a930f730d 100644 --- a/python/pyspark/pipelines/source_code_location.py +++ b/python/pyspark/pipelines/source_code_location.py @@ -30,6 +30,30 @@ def get_caller_source_code_location(stacklevel: int) -> SourceCodeLocation: """ Returns a SourceCodeLocation object representing the location code that invokes this function. + If this function is called from a decorator (ex. @sdp.table), note that the returned line + number is affected by how the decorator was triggered - i.e. whether @sdp.table or @sdp.table() + was called. + + Case 1: + |@sdp.table() + |def fn + + @sdp.table() is executed immediately, on line 1 + + Case 2: + |@sdp.table + |def fn + + @sdp.table will expand to fn = sdp.table(fn), replacing the line that `fn` is defined on. + This would be line 2. More interestingly, this means: + + |@sdp.table + | + | + |def fn + + Will expand to fn = sdp.table(fn) on line 4, where `fn` is defined. + :param stacklevel: The number of stack frames to go up. 0 means the direct caller of this function, 1 means the caller of the caller, and so on. """ diff --git a/python/pyspark/pipelines/spark_connect_graph_element_registry.py b/python/pyspark/pipelines/spark_connect_graph_element_registry.py index 8bc4aeefd2264..860cc4fdca92f 100644 --- a/python/pyspark/pipelines/spark_connect_graph_element_registry.py +++ b/python/pyspark/pipelines/spark_connect_graph_element_registry.py @@ -29,6 +29,9 @@ ) from pyspark.pipelines.flow import Flow from pyspark.pipelines.graph_element_registry import GraphElementRegistry +from pyspark.pipelines.source_code_location import ( + SourceCodeLocation +) from typing import Any, cast import pyspark.sql.connect.proto as pb2 @@ -79,6 +82,7 @@ def register_dataset(self, dataset: Dataset) -> None: partition_cols=partition_cols, schema=schema, format=format, + source_code_location=source_code_location_to_proto(dataset.source_code_location), ) command = pb2.Command() command.pipeline_command.define_dataset.CopyFrom(inner_command) @@ -96,6 +100,7 @@ def register_flow(self, flow: Flow) -> None: plan=relation, sql_conf=flow.spark_conf, once=flow.once, + source_code_location=source_code_location_to_proto(flow.source_code_location), ) command = pb2.Command() command.pipeline_command.define_flow.CopyFrom(inner_command) @@ -110,3 +115,10 @@ def register_sql(self, sql_text: str, file_path: Path) -> None: command = pb2.Command() command.pipeline_command.define_sql_graph_elements.CopyFrom(inner_command) self._client.execute_command(command) + +def source_code_location_to_proto( + source_code_location: SourceCodeLocation, +) -> pb2.SourceCodeLocation: + return pb2.SourceCodeLocation( + file_name=source_code_location.filename, line_number=source_code_location.line_number + ) \ No newline at end of file diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index e75c635a2658a..d89e52ab1ed2a 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -40,7 +40,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xe6\x12\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xbc\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x30\n\x04plan\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x04plan\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x07\n\x05_planB\x07\n\x05_once\x1aQ\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xcc\x14\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc4\x05\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x12X\n\x14source_code_location\x18\t \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x06R\x12sourceCodeLocation\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_formatB\x17\n\x15_source_code_location\x1a\xaf\x04\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x30\n\x04plan\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x04plan\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x12X\n\x14source_code_location\x18\x07 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x07\n\x05_planB\x07\n\x05_onceB\x17\n\x15_source_code_location\x1aQ\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"z\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x42\x0c\n\n_file_nameB\x0e\n\x0c_line_number*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -59,10 +59,10 @@ _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_DATASETTYPE"]._serialized_start = 3014 - _globals["_DATASETTYPE"]._serialized_end = 3111 + _globals["_DATASETTYPE"]._serialized_start = 3368 + _globals["_DATASETTYPE"]._serialized_end = 3465 _globals["_PIPELINECOMMAND"]._serialized_start = 140 - _globals["_PIPELINECOMMAND"]._serialized_end = 2546 + _globals["_PIPELINECOMMAND"]._serialized_end = 2776 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 719 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1110 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 928 @@ -72,23 +72,25 @@ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1112 _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1202 _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_start = 1205 - _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 1798 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1642 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 1708 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 1801 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2245 + _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 1913 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1732 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 1798 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 1916 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2475 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 928 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 986 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2247 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2328 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2331 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2530 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2549 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 2819 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2706 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 2804 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 2821 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 2894 - _globals["_PIPELINEEVENT"]._serialized_start = 2896 - _globals["_PIPELINEEVENT"]._serialized_end = 3012 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2477 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2558 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2561 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2760 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2779 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 3049 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2936 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 3034 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 3051 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 3124 + _globals["_PIPELINEEVENT"]._serialized_start = 3126 + _globals["_PIPELINEEVENT"]._serialized_end = 3242 + _globals["_SOURCECODELOCATION"]._serialized_start = 3244 + _globals["_SOURCECODELOCATION"]._serialized_end = 3366 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index cf2cb8d3053b7..1f7df3a50cecf 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -33,6 +33,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ + import builtins import collections.abc import google.protobuf.descriptor @@ -59,7 +60,7 @@ class _DatasetType: class _DatasetTypeEnumTypeWrapper( google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_DatasetType.ValueType], builtins.type, -): # noqa: F821 +): DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor DATASET_TYPE_UNSPECIFIED: _DatasetType.ValueType # 0 """Safe default value. Should not be used.""" @@ -83,16 +84,19 @@ TEMPORARY_VIEW: DatasetType.ValueType # 3 """A view which is not published to the catalog""" global___DatasetType = DatasetType +@typing.final class PipelineCommand(google.protobuf.message.Message): """Dispatch object for pipelines commands. See each individual command for documentation.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor + @typing.final class CreateDataflowGraph(google.protobuf.message.Message): """Request to create a new dataflow graph.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor + @typing.final class SqlConfEntry(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -107,9 +111,10 @@ class PipelineCommand(google.protobuf.message.Message): value: builtins.str = ..., ) -> None: ... def ClearField( - self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + self, field_name: typing.Literal["key", b"key", "value", b"value"] ) -> None: ... + @typing.final class Response(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -123,7 +128,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -132,7 +137,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -140,9 +145,8 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, - oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], - ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... + self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] + ) -> typing.Literal["dataflow_graph_id"] | None: ... DEFAULT_CATALOG_FIELD_NUMBER: builtins.int DEFAULT_DATABASE_FIELD_NUMBER: builtins.int @@ -165,7 +169,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_default_catalog", b"_default_catalog", "_default_database", @@ -178,7 +182,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_default_catalog", b"_default_catalog", "_default_database", @@ -193,13 +197,14 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_default_catalog", b"_default_catalog"] - ) -> typing_extensions.Literal["default_catalog"] | None: ... + self, oneof_group: typing.Literal["_default_catalog", b"_default_catalog"] + ) -> typing.Literal["default_catalog"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_default_database", b"_default_database"] - ) -> typing_extensions.Literal["default_database"] | None: ... + self, oneof_group: typing.Literal["_default_database", b"_default_database"] + ) -> typing.Literal["default_database"] | None: ... + @typing.final class DropDataflowGraph(google.protobuf.message.Message): """Drops the graph and stops any running attached flows.""" @@ -215,7 +220,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -224,7 +229,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -232,15 +237,16 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, - oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], - ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... + self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] + ) -> typing.Literal["dataflow_graph_id"] | None: ... + @typing.final class DefineDataset(google.protobuf.message.Message): """Request to define a dataset: a table, a materialized view, or a temporary view.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor + @typing.final class TablePropertiesEntry(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -255,7 +261,7 @@ class PipelineCommand(google.protobuf.message.Message): value: builtins.str = ..., ) -> None: ... def ClearField( - self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + self, field_name: typing.Literal["key", b"key", "value", b"value"] ) -> None: ... DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int @@ -266,6 +272,7 @@ class PipelineCommand(google.protobuf.message.Message): PARTITION_COLS_FIELD_NUMBER: builtins.int SCHEMA_FIELD_NUMBER: builtins.int FORMAT_FIELD_NUMBER: builtins.int + SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this dataset to.""" dataset_name: builtins.str @@ -274,6 +281,10 @@ class PipelineCommand(google.protobuf.message.Message): """The type of the dataset.""" comment: builtins.str """Optional comment for the dataset.""" + format: builtins.str + """The output table format of the dataset. Only applies to dataset_type == TABLE and + dataset_type == MATERIALIZED_VIEW. + """ @property def table_properties( self, @@ -289,10 +300,9 @@ class PipelineCommand(google.protobuf.message.Message): @property def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType: """Schema for the dataset. If unset, this will be inferred from incoming flows.""" - format: builtins.str - """The output table format of the dataset. Only applies to dataset_type == TABLE and - dataset_type == MATERIALIZED_VIEW. - """ + @property + def source_code_location(self) -> global___SourceCodeLocation: + """The location in source code that this dataset was defined.""" def __init__( self, *, @@ -304,10 +314,11 @@ class PipelineCommand(google.protobuf.message.Message): partition_cols: collections.abc.Iterable[builtins.str] | None = ..., schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., format: builtins.str | None = ..., + source_code_location: global___SourceCodeLocation | None = ..., ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_comment", b"_comment", "_dataflow_graph_id", @@ -320,6 +331,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_format", "_schema", b"_schema", + "_source_code_location", + b"_source_code_location", "comment", b"comment", "dataflow_graph_id", @@ -332,11 +345,13 @@ class PipelineCommand(google.protobuf.message.Message): b"format", "schema", b"schema", + "source_code_location", + b"source_code_location", ], ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_comment", b"_comment", "_dataflow_graph_id", @@ -349,6 +364,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_format", "_schema", b"_schema", + "_source_code_location", + b"_source_code_location", "comment", b"comment", "dataflow_graph_id", @@ -363,41 +380,48 @@ class PipelineCommand(google.protobuf.message.Message): b"partition_cols", "schema", b"schema", + "source_code_location", + b"source_code_location", "table_properties", b"table_properties", ], ) -> None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_comment", b"_comment"] - ) -> typing_extensions.Literal["comment"] | None: ... + self, oneof_group: typing.Literal["_comment", b"_comment"] + ) -> typing.Literal["comment"] | None: ... @typing.overload def WhichOneof( - self, - oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], - ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... + self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] + ) -> typing.Literal["dataflow_graph_id"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_dataset_name", b"_dataset_name"] - ) -> typing_extensions.Literal["dataset_name"] | None: ... + self, oneof_group: typing.Literal["_dataset_name", b"_dataset_name"] + ) -> typing.Literal["dataset_name"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_dataset_type", b"_dataset_type"] - ) -> typing_extensions.Literal["dataset_type"] | None: ... + self, oneof_group: typing.Literal["_dataset_type", b"_dataset_type"] + ) -> typing.Literal["dataset_type"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_format", b"_format"] - ) -> typing_extensions.Literal["format"] | None: ... + self, oneof_group: typing.Literal["_format", b"_format"] + ) -> typing.Literal["format"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_schema", b"_schema"] - ) -> typing_extensions.Literal["schema"] | None: ... + self, oneof_group: typing.Literal["_schema", b"_schema"] + ) -> typing.Literal["schema"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing.Literal["_source_code_location", b"_source_code_location"] + ) -> typing.Literal["source_code_location"] | None: ... + @typing.final class DefineFlow(google.protobuf.message.Message): """Request to define a flow targeting a dataset.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor + @typing.final class SqlConfEntry(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -412,7 +436,7 @@ class PipelineCommand(google.protobuf.message.Message): value: builtins.str = ..., ) -> None: ... def ClearField( - self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + self, field_name: typing.Literal["key", b"key", "value", b"value"] ) -> None: ... DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int @@ -421,12 +445,15 @@ class PipelineCommand(google.protobuf.message.Message): PLAN_FIELD_NUMBER: builtins.int SQL_CONF_FIELD_NUMBER: builtins.int ONCE_FIELD_NUMBER: builtins.int + SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" flow_name: builtins.str """Name of the flow. For standalone flows, this must be a single-part name.""" target_dataset_name: builtins.str """Name of the dataset this flow writes to. Can be partially or fully qualified.""" + once: builtins.bool + """If true, this flow will only be run once per full refresh.""" @property def plan(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: """An unresolved relation that defines the dataset's flow.""" @@ -435,8 +462,9 @@ class PipelineCommand(google.protobuf.message.Message): self, ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: """SQL configurations set when running this flow.""" - once: builtins.bool - """If true, this flow will only be run once per full refresh.""" + @property + def source_code_location(self) -> global___SourceCodeLocation: + """The location in source code that this flow was defined.""" def __init__( self, *, @@ -446,10 +474,11 @@ class PipelineCommand(google.protobuf.message.Message): plan: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., sql_conf: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., once: builtins.bool | None = ..., + source_code_location: global___SourceCodeLocation | None = ..., ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "_flow_name", @@ -458,6 +487,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_once", "_plan", b"_plan", + "_source_code_location", + b"_source_code_location", "_target_dataset_name", b"_target_dataset_name", "dataflow_graph_id", @@ -468,13 +499,15 @@ class PipelineCommand(google.protobuf.message.Message): b"once", "plan", b"plan", + "source_code_location", + b"source_code_location", "target_dataset_name", b"target_dataset_name", ], ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "_flow_name", @@ -483,6 +516,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_once", "_plan", b"_plan", + "_source_code_location", + b"_source_code_location", "_target_dataset_name", b"_target_dataset_name", "dataflow_graph_id", @@ -493,6 +528,8 @@ class PipelineCommand(google.protobuf.message.Message): b"once", "plan", b"plan", + "source_code_location", + b"source_code_location", "sql_conf", b"sql_conf", "target_dataset_name", @@ -501,27 +538,30 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... @typing.overload def WhichOneof( - self, - oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], - ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... + self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] + ) -> typing.Literal["dataflow_graph_id"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_flow_name", b"_flow_name"] - ) -> typing_extensions.Literal["flow_name"] | None: ... + self, oneof_group: typing.Literal["_flow_name", b"_flow_name"] + ) -> typing.Literal["flow_name"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_once", b"_once"] - ) -> typing_extensions.Literal["once"] | None: ... + self, oneof_group: typing.Literal["_once", b"_once"] + ) -> typing.Literal["once"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_plan", b"_plan"] - ) -> typing_extensions.Literal["plan"] | None: ... + self, oneof_group: typing.Literal["_plan", b"_plan"] + ) -> typing.Literal["plan"] | None: ... @typing.overload def WhichOneof( - self, - oneof_group: typing_extensions.Literal["_target_dataset_name", b"_target_dataset_name"], - ) -> typing_extensions.Literal["target_dataset_name"] | None: ... + self, oneof_group: typing.Literal["_source_code_location", b"_source_code_location"] + ) -> typing.Literal["source_code_location"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing.Literal["_target_dataset_name", b"_target_dataset_name"] + ) -> typing.Literal["target_dataset_name"] | None: ... + @typing.final class StartRun(google.protobuf.message.Message): """Resolves all datasets and flows and start a pipeline update. Should be called after all graph elements are registered. @@ -539,7 +579,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -548,7 +588,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -556,10 +596,10 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, - oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], - ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... + self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] + ) -> typing.Literal["dataflow_graph_id"] | None: ... + @typing.final class DefineSqlGraphElements(google.protobuf.message.Message): """Parses the SQL file and registers all datasets and flows.""" @@ -583,7 +623,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "_sql_file_path", @@ -600,7 +640,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "_sql_file_path", @@ -617,17 +657,16 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... @typing.overload def WhichOneof( - self, - oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], - ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... + self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] + ) -> typing.Literal["dataflow_graph_id"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_sql_file_path", b"_sql_file_path"] - ) -> typing_extensions.Literal["sql_file_path"] | None: ... + self, oneof_group: typing.Literal["_sql_file_path", b"_sql_file_path"] + ) -> typing.Literal["sql_file_path"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing_extensions.Literal["_sql_text", b"_sql_text"] - ) -> typing_extensions.Literal["sql_text"] | None: ... + self, oneof_group: typing.Literal["_sql_text", b"_sql_text"] + ) -> typing.Literal["sql_text"] | None: ... CREATE_DATAFLOW_GRAPH_FIELD_NUMBER: builtins.int DEFINE_DATASET_FIELD_NUMBER: builtins.int @@ -659,7 +698,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "command_type", b"command_type", "create_dataflow_graph", @@ -678,7 +717,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "command_type", b"command_type", "create_dataflow_graph", @@ -696,9 +735,9 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, oneof_group: typing_extensions.Literal["command_type", b"command_type"] + self, oneof_group: typing.Literal["command_type", b"command_type"] ) -> ( - typing_extensions.Literal[ + typing.Literal[ "create_dataflow_graph", "define_dataset", "define_flow", @@ -711,11 +750,13 @@ class PipelineCommand(google.protobuf.message.Message): global___PipelineCommand = PipelineCommand +@typing.final class PipelineCommandResult(google.protobuf.message.Message): """Dispatch object for pipelines command results.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor + @typing.final class CreateDataflowGraphResult(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -729,7 +770,7 @@ class PipelineCommandResult(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -738,7 +779,7 @@ class PipelineCommandResult(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -746,9 +787,8 @@ class PipelineCommandResult(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, - oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], - ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... + self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] + ) -> typing.Literal["dataflow_graph_id"] | None: ... CREATE_DATAFLOW_GRAPH_RESULT_FIELD_NUMBER: builtins.int @property @@ -763,7 +803,7 @@ class PipelineCommandResult(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "create_dataflow_graph_result", b"create_dataflow_graph_result", "result_type", @@ -772,7 +812,7 @@ class PipelineCommandResult(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "create_dataflow_graph_result", b"create_dataflow_graph_result", "result_type", @@ -780,11 +820,12 @@ class PipelineCommandResult(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, oneof_group: typing_extensions.Literal["result_type", b"result_type"] - ) -> typing_extensions.Literal["create_dataflow_graph_result"] | None: ... + self, oneof_group: typing.Literal["result_type", b"result_type"] + ) -> typing.Literal["create_dataflow_graph_result"] | None: ... global___PipelineCommandResult = PipelineCommandResult +@typing.final class PipelineEventResult(google.protobuf.message.Message): """A response containing an event emitted during the run of a pipeline.""" @@ -798,23 +839,22 @@ class PipelineEventResult(google.protobuf.message.Message): *, event: global___PipelineEvent | None = ..., ) -> None: ... - def HasField( - self, field_name: typing_extensions.Literal["event", b"event"] - ) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["event", b"event"]) -> None: ... + def HasField(self, field_name: typing.Literal["event", b"event"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["event", b"event"]) -> None: ... global___PipelineEventResult = PipelineEventResult +@typing.final class PipelineEvent(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor TIMESTAMP_FIELD_NUMBER: builtins.int MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + """The message that should be displayed to users.""" @property def timestamp(self) -> google.protobuf.timestamp_pb2.Timestamp: """The timestamp corresponding to when the event occurred.""" - message: builtins.str - """The message that should be displayed to users.""" def __init__( self, *, @@ -823,18 +863,73 @@ class PipelineEvent(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_message", b"_message", "message", b"message", "timestamp", b"timestamp" ], ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal[ + field_name: typing.Literal[ "_message", b"_message", "message", b"message", "timestamp", b"timestamp" ], ) -> None: ... def WhichOneof( - self, oneof_group: typing_extensions.Literal["_message", b"_message"] - ) -> typing_extensions.Literal["message"] | None: ... + self, oneof_group: typing.Literal["_message", b"_message"] + ) -> typing.Literal["message"] | None: ... global___PipelineEvent = PipelineEvent + +@typing.final +class SourceCodeLocation(google.protobuf.message.Message): + """Source code location information associated with a particular dataset or flow.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + FILE_NAME_FIELD_NUMBER: builtins.int + LINE_NUMBER_FIELD_NUMBER: builtins.int + file_name: builtins.str + """The file that this pipeline source code was defined in.""" + line_number: builtins.int + """The specific line number that this pipeline source code is located at, if applicable.""" + def __init__( + self, + *, + file_name: builtins.str | None = ..., + line_number: builtins.int | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing.Literal[ + "_file_name", + b"_file_name", + "_line_number", + b"_line_number", + "file_name", + b"file_name", + "line_number", + b"line_number", + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing.Literal[ + "_file_name", + b"_file_name", + "_line_number", + b"_line_number", + "file_name", + b"file_name", + "line_number", + b"line_number", + ], + ) -> None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing.Literal["_file_name", b"_file_name"] + ) -> typing.Literal["file_name"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing.Literal["_line_number", b"_line_number"] + ) -> typing.Literal["line_number"] | None: ... + +global___SourceCodeLocation = SourceCodeLocation diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index f4f1d3b043d33..5279e046a6cb4 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -88,6 +88,9 @@ message PipelineCommand { // The output table format of the dataset. Only applies to dataset_type == TABLE and // dataset_type == MATERIALIZED_VIEW. optional string format = 8; + + // The location in source code that this dataset was defined. + optional SourceCodeLocation source_code_location = 9; } // Request to define a flow targeting a dataset. @@ -109,6 +112,9 @@ message PipelineCommand { // If true, this flow will only be run once per full refresh. optional bool once = 6; + + // The location in source code that this flow was defined. + optional SourceCodeLocation source_code_location = 7; } // Resolves all datasets and flows and start a pipeline update. Should be called after all @@ -166,3 +172,11 @@ message PipelineEvent { // The message that should be displayed to users. optional string message = 2; } + +// Source code location information associated with a particular dataset or flow. +message SourceCodeLocation { + // The file that this pipeline source code was defined in. + optional string file_name = 1; + // The specific line number that this pipeline source code is located at, if applicable. + optional int32 line_number = 2; +} diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 92cb5bcac4ee0..a1bcfa9dba498 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -156,6 +156,10 @@ private[connect] object PipelinesHandler extends Logging { .filter(_.nonEmpty), properties = dataset.getTablePropertiesMap.asScala.toMap, baseOrigin = QueryOrigin( + filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( + dataset.getSourceCodeLocation.getFileName), + line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)( + dataset.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.Table.toString), objectName = Option(tableIdentifier.unquotedString), language = Option(Python())), @@ -171,6 +175,10 @@ private[connect] object PipelinesHandler extends Logging { identifier = viewIdentifier, comment = Option(dataset.getComment), origin = QueryOrigin( + filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( + dataset.getSourceCodeLocation.getFileName), + line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)( + dataset.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.View.toString), objectName = Option(viewIdentifier.unquotedString), language = Option(Python())), @@ -214,6 +222,10 @@ private[connect] object PipelinesHandler extends Logging { Option(graphElementRegistry.defaultDatabase)), comment = None, origin = QueryOrigin( + filePath = Option.when(flow.getSourceCodeLocation.hasFileName)( + flow.getSourceCodeLocation.getFileName), + line = Option.when(flow.getSourceCodeLocation.hasLineNumber)( + flow.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.Flow.toString), objectName = Option(flowIdentifier.unquotedString), language = Option(Python())))) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 21f2857090182..23d1401da5fc6 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -28,7 +28,11 @@ import scala.util.Try import org.apache.spark.api.python.PythonUtils import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.pipelines.graph.DataflowGraph +import org.apache.spark.sql.pipelines.Language.Python +import org.apache.spark.sql.pipelines.QueryOriginType +import org.apache.spark.sql.pipelines.common.FlowStatus +import org.apache.spark.sql.pipelines.graph.{DataflowGraph, QueryOrigin} +import org.apache.spark.sql.pipelines.logging.EventLevel import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, TestPipelineUpdateContextMixin} /** @@ -101,6 +105,137 @@ class PythonPipelineSuite assert(graph.tables.size == 1) } + test("failed flow progress event has correct python source code location") { + // Note that pythonText will be inserted into line 26 of the python script that is run. + val unresolvedGraph = buildGraph(pythonText = """ + |@sdp.table() + |def table1(): + | df = spark.createDataFrame([(25,), (30,), (45,)], ["age"]) + | return df.select("name") + |""".stripMargin) + + val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph) + updateContext.pipelineExecution.runPipeline() + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("table1"), + expectedFlowStatus = FlowStatus.FAILED, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(27), + objectName = Option("spark_catalog.default.table1"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + errorChecker = ex => ex.getMessage.contains( + "A column, variable, or function parameter with name `name` cannot be resolved."), + expectedEventLevel = EventLevel.WARN + ) + } + + test("flow progress events have correct python source code location") { + val unresolvedGraph = buildGraph(pythonText = """ + |@sdp.table( + | comment = 'my table' + |) + |def table1(): + | return spark.readStream.table('mv') + | + |@sdp.materialized_view + |def mv2(): + | return spark.range(26, 29) + | + |@sdp.materialized_view + |def mv(): + | df = spark.createDataFrame([(25,), (30,), (45,)], ["age"]) + | return df.select("age") + | + |@sdp.append_flow( + | target = 'table1' + |) + |def standalone_flow1(): + | return spark.readStream.table('mv2') + |""".stripMargin) + + val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph) + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + Seq(FlowStatus.QUEUED, FlowStatus.STARTING, + FlowStatus.PLANNING, FlowStatus.RUNNING, FlowStatus.COMPLETED).foreach { flowStatus => + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("mv2"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(34), + objectName = Option("spark_catalog.default.mv2"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + expectedEventLevel = EventLevel.INFO + ) + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("mv"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(38), + objectName = Option("spark_catalog.default.mv"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + expectedEventLevel = EventLevel.INFO + ) + } + + // Note that streaming flows do not have a PLANNING phase. + Seq(FlowStatus.QUEUED, FlowStatus.STARTING, FlowStatus.RUNNING, FlowStatus.COMPLETED).foreach { + flowStatus => + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("table1"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(27), + objectName = Option("spark_catalog.default.table1"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + expectedEventLevel = EventLevel.INFO + ) + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("standalone_flow1"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(42), + objectName = Option("spark_catalog.default.standalone_flow1"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + expectedEventLevel = EventLevel.INFO + ) + } + } + test("basic with inverted topological order") { // This graph is purposefully in the wrong topological order to test the topological sort val graph = buildGraph(""" diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala index d33924c2e1c37..a1b029b653842 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala @@ -129,7 +129,8 @@ private class FlowResolver(rawGraph: DataflowGraph) { allInputs = allInputs, availableInputs = availableResolvedInputs.values.toList, configuration = flowToResolve.sqlConf, - queryContext = flowToResolve.queryContext + queryContext = flowToResolve.queryContext, + queryOrigin = flowToResolve.origin ) val result = flowFunctionResult match { @@ -175,7 +176,8 @@ private class FlowResolver(rawGraph: DataflowGraph) { allInputs = allInputs, availableInputs = availableResolvedInputs.values.toList, configuration = newSqlConf, - queryContext = flowToResolve.queryContext + queryContext = flowToResolve.queryContext, + queryOrigin = flowToResolve.origin ) } else { f diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala index 2378b6f8d96a6..0948c100b86b1 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala @@ -75,13 +75,16 @@ trait FlowFunction extends Logging { * @param availableInputs the list of all [[Input]]s available to this flow * @param configuration the spark configurations that apply to this flow. * @param queryContext The context of the query being evaluated. + * @param queryOrigin The source code location of the flow definition this flow function was + * instantiated from. * @return the inputs actually used, and the DataFrame expression for the flow */ def call( allInputs: Set[TableIdentifier], availableInputs: Seq[Input], configuration: Map[String, String], - queryContext: QueryContext + queryContext: QueryContext, + queryOrigin: QueryOrigin ): FlowFunctionResult } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala index 7e2e97f2b5d74..984e7d3925b49 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala @@ -46,13 +46,15 @@ object FlowAnalysis { allInputs: Set[TableIdentifier], availableInputs: Seq[Input], confs: Map[String, String], - queryContext: QueryContext + queryContext: QueryContext, + queryOrigin: QueryOrigin ): FlowFunctionResult = { val ctx = FlowAnalysisContext( allInputs = allInputs, availableInputs = availableInputs, queryContext = queryContext, - spark = SparkSession.active + spark = SparkSession.active, + queryOrigin = queryOrigin ) val df = try { confs.foreach { case (k, v) => ctx.setConf(k, v) } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala index 1139946df59ac..fa8689e5ad608 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala @@ -46,7 +46,8 @@ private[pipelines] case class FlowAnalysisContext( shouldLowerCaseNames: Boolean = false, analysisWarnings: mutable.Buffer[AnalysisWarning] = new ListBuffer[AnalysisWarning], spark: SparkSession, - externalInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty + externalInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty, + queryOrigin: QueryOrigin ) { /** Map from `Input` name to the actual `Input` */ diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala index 0e2ba42b15e59..1ad327b1e6c38 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala @@ -49,35 +49,54 @@ class GraphRegistrationContext( flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf) } + /** + * Collects all graph registered entities (tables, views, flows) into a DataflowGraph object, + * for graph analysis/resolution/execution. Also responsible for fully qualifying any partially + * qualified user-specified identifiers using the pipeline-level catalog and database. If + * identifiers have already been qualified during graph element registration, that qualification + * is respected. + */ def toDataflowGraph: DataflowGraph = { val qualifiedTables = tables.toSeq.map { t => + val fullyQualifiedTableIdentifier = GraphIdentifierManager + .parseAndQualifyTableIdentifier( + rawTableIdentifier = t.identifier, + currentCatalog = Some(defaultCatalog), + currentDatabase = Some(defaultDatabase) + ) + .identifier t.copy( - identifier = GraphIdentifierManager - .parseAndQualifyTableIdentifier( - rawTableIdentifier = t.identifier, - currentCatalog = Some(defaultCatalog), - currentDatabase = Some(defaultDatabase) - ) - .identifier + identifier = fullyQualifiedTableIdentifier, + baseOrigin = t.baseOrigin.copy( + objectName = Option(fullyQualifiedTableIdentifier.unquotedString) + ) ) } val validatedViews = views.toSeq.collect { case v: TemporaryView => + val parsedAndValidatedTemporaryViewIdentifier = GraphIdentifierManager + .parseAndValidateTemporaryViewIdentifier( + rawViewIdentifier = v.identifier + ) v.copy( - identifier = GraphIdentifierManager - .parseAndValidateTemporaryViewIdentifier( - rawViewIdentifier = v.identifier - ) + identifier = parsedAndValidatedTemporaryViewIdentifier, + origin = v.origin.copy( + objectName = Option(parsedAndValidatedTemporaryViewIdentifier.unquotedString) + ) ) case v: PersistedView => + val fullyQualifiedPersistedViewIdentifier = GraphIdentifierManager + .parseAndValidatePersistedViewIdentifier( + rawViewIdentifier = v.identifier, + currentCatalog = Some(defaultCatalog), + currentDatabase = Some(defaultDatabase) + ) v.copy( - identifier = GraphIdentifierManager - .parseAndValidatePersistedViewIdentifier( - rawViewIdentifier = v.identifier, - currentCatalog = Some(defaultCatalog), - currentDatabase = Some(defaultDatabase) - ) + identifier = fullyQualifiedPersistedViewIdentifier, + origin = v.origin.copy( + objectName = Option(fullyQualifiedPersistedViewIdentifier.unquotedString) + ) ) } @@ -94,21 +113,25 @@ class GraphRegistrationContext( if (isImplicitFlow && flowWritesToView) { f } else { + val fullyQualifiedFlowIdentifier = GraphIdentifierManager + .parseAndQualifyFlowIdentifier( + rawFlowIdentifier = f.identifier, + currentCatalog = Some(defaultCatalog), + currentDatabase = Some(defaultDatabase) + ) + .identifier f.copy( - identifier = GraphIdentifierManager - .parseAndQualifyFlowIdentifier( - rawFlowIdentifier = f.identifier, - currentCatalog = Some(defaultCatalog), - currentDatabase = Some(defaultDatabase) - ) - .identifier, + identifier = fullyQualifiedFlowIdentifier, destinationIdentifier = GraphIdentifierManager .parseAndQualifyFlowIdentifier( rawFlowIdentifier = f.destinationIdentifier, currentCatalog = Some(defaultCatalog), currentDatabase = Some(defaultDatabase) ) - .identifier + .identifier, + origin = f.origin.copy( + objectName = Option(fullyQualifiedFlowIdentifier.unquotedString) + ) ) } } From 18bccc37b84e2eb8374fa08c55afda28900ce99c Mon Sep 17 00:00:00 2001 From: anishm-db Date: Tue, 1 Jul 2025 21:51:11 +0000 Subject: [PATCH 2/4] regen with python >= 3.10 --- .../sql/connect/proto/pipelines_pb2.pyi | 217 +++++++++--------- 1 file changed, 108 insertions(+), 109 deletions(-) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index 1f7df3a50cecf..42387297f1a09 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -33,7 +33,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ - import builtins import collections.abc import google.protobuf.descriptor @@ -60,7 +59,7 @@ class _DatasetType: class _DatasetTypeEnumTypeWrapper( google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_DatasetType.ValueType], builtins.type, -): +): # noqa: F821 DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor DATASET_TYPE_UNSPECIFIED: _DatasetType.ValueType # 0 """Safe default value. Should not be used.""" @@ -84,19 +83,16 @@ TEMPORARY_VIEW: DatasetType.ValueType # 3 """A view which is not published to the catalog""" global___DatasetType = DatasetType -@typing.final class PipelineCommand(google.protobuf.message.Message): """Dispatch object for pipelines commands. See each individual command for documentation.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor - @typing.final class CreateDataflowGraph(google.protobuf.message.Message): """Request to create a new dataflow graph.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor - @typing.final class SqlConfEntry(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -111,10 +107,9 @@ class PipelineCommand(google.protobuf.message.Message): value: builtins.str = ..., ) -> None: ... def ClearField( - self, field_name: typing.Literal["key", b"key", "value", b"value"] + self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] ) -> None: ... - @typing.final class Response(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -128,7 +123,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -137,7 +132,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -145,8 +140,9 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] - ) -> typing.Literal["dataflow_graph_id"] | None: ... + self, + oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], + ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... DEFAULT_CATALOG_FIELD_NUMBER: builtins.int DEFAULT_DATABASE_FIELD_NUMBER: builtins.int @@ -169,7 +165,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_default_catalog", b"_default_catalog", "_default_database", @@ -182,7 +178,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_default_catalog", b"_default_catalog", "_default_database", @@ -197,14 +193,13 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_default_catalog", b"_default_catalog"] - ) -> typing.Literal["default_catalog"] | None: ... + self, oneof_group: typing_extensions.Literal["_default_catalog", b"_default_catalog"] + ) -> typing_extensions.Literal["default_catalog"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_default_database", b"_default_database"] - ) -> typing.Literal["default_database"] | None: ... + self, oneof_group: typing_extensions.Literal["_default_database", b"_default_database"] + ) -> typing_extensions.Literal["default_database"] | None: ... - @typing.final class DropDataflowGraph(google.protobuf.message.Message): """Drops the graph and stops any running attached flows.""" @@ -220,7 +215,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -229,7 +224,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -237,16 +232,15 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] - ) -> typing.Literal["dataflow_graph_id"] | None: ... + self, + oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], + ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... - @typing.final class DefineDataset(google.protobuf.message.Message): """Request to define a dataset: a table, a materialized view, or a temporary view.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor - @typing.final class TablePropertiesEntry(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -261,7 +255,7 @@ class PipelineCommand(google.protobuf.message.Message): value: builtins.str = ..., ) -> None: ... def ClearField( - self, field_name: typing.Literal["key", b"key", "value", b"value"] + self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] ) -> None: ... DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int @@ -281,10 +275,6 @@ class PipelineCommand(google.protobuf.message.Message): """The type of the dataset.""" comment: builtins.str """Optional comment for the dataset.""" - format: builtins.str - """The output table format of the dataset. Only applies to dataset_type == TABLE and - dataset_type == MATERIALIZED_VIEW. - """ @property def table_properties( self, @@ -300,6 +290,10 @@ class PipelineCommand(google.protobuf.message.Message): @property def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType: """Schema for the dataset. If unset, this will be inferred from incoming flows.""" + format: builtins.str + """The output table format of the dataset. Only applies to dataset_type == TABLE and + dataset_type == MATERIALIZED_VIEW. + """ @property def source_code_location(self) -> global___SourceCodeLocation: """The location in source code that this dataset was defined.""" @@ -318,7 +312,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_comment", b"_comment", "_dataflow_graph_id", @@ -351,7 +345,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_comment", b"_comment", "_dataflow_graph_id", @@ -388,40 +382,42 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_comment", b"_comment"] - ) -> typing.Literal["comment"] | None: ... + self, oneof_group: typing_extensions.Literal["_comment", b"_comment"] + ) -> typing_extensions.Literal["comment"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] - ) -> typing.Literal["dataflow_graph_id"] | None: ... + self, + oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], + ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_dataset_name", b"_dataset_name"] - ) -> typing.Literal["dataset_name"] | None: ... + self, oneof_group: typing_extensions.Literal["_dataset_name", b"_dataset_name"] + ) -> typing_extensions.Literal["dataset_name"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_dataset_type", b"_dataset_type"] - ) -> typing.Literal["dataset_type"] | None: ... + self, oneof_group: typing_extensions.Literal["_dataset_type", b"_dataset_type"] + ) -> typing_extensions.Literal["dataset_type"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_format", b"_format"] - ) -> typing.Literal["format"] | None: ... + self, oneof_group: typing_extensions.Literal["_format", b"_format"] + ) -> typing_extensions.Literal["format"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_schema", b"_schema"] - ) -> typing.Literal["schema"] | None: ... + self, oneof_group: typing_extensions.Literal["_schema", b"_schema"] + ) -> typing_extensions.Literal["schema"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_source_code_location", b"_source_code_location"] - ) -> typing.Literal["source_code_location"] | None: ... + self, + oneof_group: typing_extensions.Literal[ + "_source_code_location", b"_source_code_location" + ], + ) -> typing_extensions.Literal["source_code_location"] | None: ... - @typing.final class DefineFlow(google.protobuf.message.Message): """Request to define a flow targeting a dataset.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor - @typing.final class SqlConfEntry(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -436,7 +432,7 @@ class PipelineCommand(google.protobuf.message.Message): value: builtins.str = ..., ) -> None: ... def ClearField( - self, field_name: typing.Literal["key", b"key", "value", b"value"] + self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] ) -> None: ... DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int @@ -452,8 +448,6 @@ class PipelineCommand(google.protobuf.message.Message): """Name of the flow. For standalone flows, this must be a single-part name.""" target_dataset_name: builtins.str """Name of the dataset this flow writes to. Can be partially or fully qualified.""" - once: builtins.bool - """If true, this flow will only be run once per full refresh.""" @property def plan(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: """An unresolved relation that defines the dataset's flow.""" @@ -462,6 +456,8 @@ class PipelineCommand(google.protobuf.message.Message): self, ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: """SQL configurations set when running this flow.""" + once: builtins.bool + """If true, this flow will only be run once per full refresh.""" @property def source_code_location(self) -> global___SourceCodeLocation: """The location in source code that this flow was defined.""" @@ -478,7 +474,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "_flow_name", @@ -507,7 +503,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "_flow_name", @@ -538,30 +534,34 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] - ) -> typing.Literal["dataflow_graph_id"] | None: ... + self, + oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], + ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_flow_name", b"_flow_name"] - ) -> typing.Literal["flow_name"] | None: ... + self, oneof_group: typing_extensions.Literal["_flow_name", b"_flow_name"] + ) -> typing_extensions.Literal["flow_name"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_once", b"_once"] - ) -> typing.Literal["once"] | None: ... + self, oneof_group: typing_extensions.Literal["_once", b"_once"] + ) -> typing_extensions.Literal["once"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_plan", b"_plan"] - ) -> typing.Literal["plan"] | None: ... + self, oneof_group: typing_extensions.Literal["_plan", b"_plan"] + ) -> typing_extensions.Literal["plan"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_source_code_location", b"_source_code_location"] - ) -> typing.Literal["source_code_location"] | None: ... + self, + oneof_group: typing_extensions.Literal[ + "_source_code_location", b"_source_code_location" + ], + ) -> typing_extensions.Literal["source_code_location"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_target_dataset_name", b"_target_dataset_name"] - ) -> typing.Literal["target_dataset_name"] | None: ... + self, + oneof_group: typing_extensions.Literal["_target_dataset_name", b"_target_dataset_name"], + ) -> typing_extensions.Literal["target_dataset_name"] | None: ... - @typing.final class StartRun(google.protobuf.message.Message): """Resolves all datasets and flows and start a pipeline update. Should be called after all graph elements are registered. @@ -579,7 +579,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -588,7 +588,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -596,10 +596,10 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] - ) -> typing.Literal["dataflow_graph_id"] | None: ... + self, + oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], + ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... - @typing.final class DefineSqlGraphElements(google.protobuf.message.Message): """Parses the SQL file and registers all datasets and flows.""" @@ -623,7 +623,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "_sql_file_path", @@ -640,7 +640,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "_sql_file_path", @@ -657,16 +657,17 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] - ) -> typing.Literal["dataflow_graph_id"] | None: ... + self, + oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], + ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_sql_file_path", b"_sql_file_path"] - ) -> typing.Literal["sql_file_path"] | None: ... + self, oneof_group: typing_extensions.Literal["_sql_file_path", b"_sql_file_path"] + ) -> typing_extensions.Literal["sql_file_path"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_sql_text", b"_sql_text"] - ) -> typing.Literal["sql_text"] | None: ... + self, oneof_group: typing_extensions.Literal["_sql_text", b"_sql_text"] + ) -> typing_extensions.Literal["sql_text"] | None: ... CREATE_DATAFLOW_GRAPH_FIELD_NUMBER: builtins.int DEFINE_DATASET_FIELD_NUMBER: builtins.int @@ -698,7 +699,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "command_type", b"command_type", "create_dataflow_graph", @@ -717,7 +718,7 @@ class PipelineCommand(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "command_type", b"command_type", "create_dataflow_graph", @@ -735,9 +736,9 @@ class PipelineCommand(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, oneof_group: typing.Literal["command_type", b"command_type"] + self, oneof_group: typing_extensions.Literal["command_type", b"command_type"] ) -> ( - typing.Literal[ + typing_extensions.Literal[ "create_dataflow_graph", "define_dataset", "define_flow", @@ -750,13 +751,11 @@ class PipelineCommand(google.protobuf.message.Message): global___PipelineCommand = PipelineCommand -@typing.final class PipelineCommandResult(google.protobuf.message.Message): """Dispatch object for pipelines command results.""" DESCRIPTOR: google.protobuf.descriptor.Descriptor - @typing.final class CreateDataflowGraphResult(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -770,7 +769,7 @@ class PipelineCommandResult(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -779,7 +778,7 @@ class PipelineCommandResult(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_dataflow_graph_id", b"_dataflow_graph_id", "dataflow_graph_id", @@ -787,8 +786,9 @@ class PipelineCommandResult(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, oneof_group: typing.Literal["_dataflow_graph_id", b"_dataflow_graph_id"] - ) -> typing.Literal["dataflow_graph_id"] | None: ... + self, + oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"], + ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ... CREATE_DATAFLOW_GRAPH_RESULT_FIELD_NUMBER: builtins.int @property @@ -803,7 +803,7 @@ class PipelineCommandResult(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "create_dataflow_graph_result", b"create_dataflow_graph_result", "result_type", @@ -812,7 +812,7 @@ class PipelineCommandResult(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "create_dataflow_graph_result", b"create_dataflow_graph_result", "result_type", @@ -820,12 +820,11 @@ class PipelineCommandResult(google.protobuf.message.Message): ], ) -> None: ... def WhichOneof( - self, oneof_group: typing.Literal["result_type", b"result_type"] - ) -> typing.Literal["create_dataflow_graph_result"] | None: ... + self, oneof_group: typing_extensions.Literal["result_type", b"result_type"] + ) -> typing_extensions.Literal["create_dataflow_graph_result"] | None: ... global___PipelineCommandResult = PipelineCommandResult -@typing.final class PipelineEventResult(google.protobuf.message.Message): """A response containing an event emitted during the run of a pipeline.""" @@ -839,22 +838,23 @@ class PipelineEventResult(google.protobuf.message.Message): *, event: global___PipelineEvent | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["event", b"event"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["event", b"event"]) -> None: ... + def HasField( + self, field_name: typing_extensions.Literal["event", b"event"] + ) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["event", b"event"]) -> None: ... global___PipelineEventResult = PipelineEventResult -@typing.final class PipelineEvent(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor TIMESTAMP_FIELD_NUMBER: builtins.int MESSAGE_FIELD_NUMBER: builtins.int - message: builtins.str - """The message that should be displayed to users.""" @property def timestamp(self) -> google.protobuf.timestamp_pb2.Timestamp: """The timestamp corresponding to when the event occurred.""" + message: builtins.str + """The message that should be displayed to users.""" def __init__( self, *, @@ -863,23 +863,22 @@ class PipelineEvent(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_message", b"_message", "message", b"message", "timestamp", b"timestamp" ], ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_message", b"_message", "message", b"message", "timestamp", b"timestamp" ], ) -> None: ... def WhichOneof( - self, oneof_group: typing.Literal["_message", b"_message"] - ) -> typing.Literal["message"] | None: ... + self, oneof_group: typing_extensions.Literal["_message", b"_message"] + ) -> typing_extensions.Literal["message"] | None: ... global___PipelineEvent = PipelineEvent -@typing.final class SourceCodeLocation(google.protobuf.message.Message): """Source code location information associated with a particular dataset or flow.""" @@ -899,7 +898,7 @@ class SourceCodeLocation(google.protobuf.message.Message): ) -> None: ... def HasField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_file_name", b"_file_name", "_line_number", @@ -912,7 +911,7 @@ class SourceCodeLocation(google.protobuf.message.Message): ) -> builtins.bool: ... def ClearField( self, - field_name: typing.Literal[ + field_name: typing_extensions.Literal[ "_file_name", b"_file_name", "_line_number", @@ -925,11 +924,11 @@ class SourceCodeLocation(google.protobuf.message.Message): ) -> None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_file_name", b"_file_name"] - ) -> typing.Literal["file_name"] | None: ... + self, oneof_group: typing_extensions.Literal["_file_name", b"_file_name"] + ) -> typing_extensions.Literal["file_name"] | None: ... @typing.overload def WhichOneof( - self, oneof_group: typing.Literal["_line_number", b"_line_number"] - ) -> typing.Literal["line_number"] | None: ... + self, oneof_group: typing_extensions.Literal["_line_number", b"_line_number"] + ) -> typing_extensions.Literal["line_number"] | None: ... global___SourceCodeLocation = SourceCodeLocation From a1ce559893d0a8ae14902fb5aaf6605878a1c016 Mon Sep 17 00:00:00 2001 From: anishm-db Date: Tue, 1 Jul 2025 21:52:22 +0000 Subject: [PATCH 3/4] run dev/reformat-python --- .../pipelines/spark_connect_graph_element_registry.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/pyspark/pipelines/spark_connect_graph_element_registry.py b/python/pyspark/pipelines/spark_connect_graph_element_registry.py index 860cc4fdca92f..39deae0c58623 100644 --- a/python/pyspark/pipelines/spark_connect_graph_element_registry.py +++ b/python/pyspark/pipelines/spark_connect_graph_element_registry.py @@ -29,9 +29,7 @@ ) from pyspark.pipelines.flow import Flow from pyspark.pipelines.graph_element_registry import GraphElementRegistry -from pyspark.pipelines.source_code_location import ( - SourceCodeLocation -) +from pyspark.pipelines.source_code_location import SourceCodeLocation from typing import Any, cast import pyspark.sql.connect.proto as pb2 @@ -116,9 +114,10 @@ def register_sql(self, sql_text: str, file_path: Path) -> None: command.pipeline_command.define_sql_graph_elements.CopyFrom(inner_command) self._client.execute_command(command) + def source_code_location_to_proto( source_code_location: SourceCodeLocation, ) -> pb2.SourceCodeLocation: return pb2.SourceCodeLocation( file_name=source_code_location.filename, line_number=source_code_location.line_number - ) \ No newline at end of file + ) From 8a9dce2a5fea7b73d579b7d78a12fca410f12b4c Mon Sep 17 00:00:00 2001 From: anishm-db Date: Wed, 2 Jul 2025 20:57:25 +0000 Subject: [PATCH 4/4] fix tests for python 3.10+ --- python/pyspark/pipelines/source_code_location.py | 12 ++++++++---- .../sql/connect/pipelines/PythonPipelineSuite.scala | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/python/pyspark/pipelines/source_code_location.py b/python/pyspark/pipelines/source_code_location.py index b546a930f730d..cbf4cbe514a69 100644 --- a/python/pyspark/pipelines/source_code_location.py +++ b/python/pyspark/pipelines/source_code_location.py @@ -32,20 +32,20 @@ def get_caller_source_code_location(stacklevel: int) -> SourceCodeLocation: If this function is called from a decorator (ex. @sdp.table), note that the returned line number is affected by how the decorator was triggered - i.e. whether @sdp.table or @sdp.table() - was called. + was called - AND what python version is being used Case 1: |@sdp.table() |def fn - @sdp.table() is executed immediately, on line 1 + @sdp.table() is executed immediately, on line 1. This is true for all python versions. Case 2: |@sdp.table |def fn - @sdp.table will expand to fn = sdp.table(fn), replacing the line that `fn` is defined on. - This would be line 2. More interestingly, this means: + In python < 3.10, @sdp.table will expand to fn = sdp.table(fn), replacing the line that `fn` is + defined on. This would be line 2. More interestingly, this means: |@sdp.table | @@ -54,6 +54,10 @@ def get_caller_source_code_location(stacklevel: int) -> SourceCodeLocation: Will expand to fn = sdp.table(fn) on line 4, where `fn` is defined. + However, in python 3.10+, the line number in the stack trace will still be the line that the + decorator was defined on. In other words, case 2 will be treated the same as case 1, and the + line number will be 1. + :param stacklevel: The number of stack frames to go up. 0 means the direct caller of this function, 1 means the caller of the caller, and so on. """ diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 23d1401da5fc6..70550c0ba065d 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -174,7 +174,7 @@ class PythonPipelineSuite QueryOrigin( language = Option(Python()), filePath = Option(""), - line = Option(34), + line = Option(33), objectName = Option("spark_catalog.default.mv2"), objectType = Option(QueryOriginType.Flow.toString) ) @@ -190,7 +190,7 @@ class PythonPipelineSuite QueryOrigin( language = Option(Python()), filePath = Option(""), - line = Option(38), + line = Option(37), objectName = Option("spark_catalog.default.mv"), objectType = Option(QueryOriginType.Flow.toString) )