diff --git a/python/pyspark/pipelines/source_code_location.py b/python/pyspark/pipelines/source_code_location.py index 5f23b819abd85..cbf4cbe514a69 100644 --- a/python/pyspark/pipelines/source_code_location.py +++ b/python/pyspark/pipelines/source_code_location.py @@ -30,6 +30,34 @@ def get_caller_source_code_location(stacklevel: int) -> SourceCodeLocation: """ Returns a SourceCodeLocation object representing the location code that invokes this function. + If this function is called from a decorator (ex. @sdp.table), note that the returned line + number is affected by how the decorator was triggered - i.e. whether @sdp.table or @sdp.table() + was called - AND what python version is being used + + Case 1: + |@sdp.table() + |def fn + + @sdp.table() is executed immediately, on line 1. This is true for all python versions. + + Case 2: + |@sdp.table + |def fn + + In python < 3.10, @sdp.table will expand to fn = sdp.table(fn), replacing the line that `fn` is + defined on. This would be line 2. More interestingly, this means: + + |@sdp.table + | + | + |def fn + + Will expand to fn = sdp.table(fn) on line 4, where `fn` is defined. + + However, in python 3.10+, the line number in the stack trace will still be the line that the + decorator was defined on. In other words, case 2 will be treated the same as case 1, and the + line number will be 1. + :param stacklevel: The number of stack frames to go up. 0 means the direct caller of this function, 1 means the caller of the caller, and so on. """ diff --git a/python/pyspark/pipelines/spark_connect_graph_element_registry.py b/python/pyspark/pipelines/spark_connect_graph_element_registry.py index 8bc4aeefd2264..39deae0c58623 100644 --- a/python/pyspark/pipelines/spark_connect_graph_element_registry.py +++ b/python/pyspark/pipelines/spark_connect_graph_element_registry.py @@ -29,6 +29,7 @@ ) from pyspark.pipelines.flow import Flow from pyspark.pipelines.graph_element_registry import GraphElementRegistry +from pyspark.pipelines.source_code_location import SourceCodeLocation from typing import Any, cast import pyspark.sql.connect.proto as pb2 @@ -79,6 +80,7 @@ def register_dataset(self, dataset: Dataset) -> None: partition_cols=partition_cols, schema=schema, format=format, + source_code_location=source_code_location_to_proto(dataset.source_code_location), ) command = pb2.Command() command.pipeline_command.define_dataset.CopyFrom(inner_command) @@ -96,6 +98,7 @@ def register_flow(self, flow: Flow) -> None: plan=relation, sql_conf=flow.spark_conf, once=flow.once, + source_code_location=source_code_location_to_proto(flow.source_code_location), ) command = pb2.Command() command.pipeline_command.define_flow.CopyFrom(inner_command) @@ -110,3 +113,11 @@ def register_sql(self, sql_text: str, file_path: Path) -> None: command = pb2.Command() command.pipeline_command.define_sql_graph_elements.CopyFrom(inner_command) self._client.execute_command(command) + + +def source_code_location_to_proto( + source_code_location: SourceCodeLocation, +) -> pb2.SourceCodeLocation: + return pb2.SourceCodeLocation( + file_name=source_code_location.filename, line_number=source_code_location.line_number + ) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index e75c635a2658a..d89e52ab1ed2a 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -40,7 +40,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xe6\x12\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xbc\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x30\n\x04plan\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x04plan\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x07\n\x05_planB\x07\n\x05_once\x1aQ\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xcc\x14\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc4\x05\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x12X\n\x14source_code_location\x18\t \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x06R\x12sourceCodeLocation\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_formatB\x17\n\x15_source_code_location\x1a\xaf\x04\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x30\n\x04plan\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x04plan\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x12X\n\x14source_code_location\x18\x07 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x07\n\x05_planB\x07\n\x05_onceB\x17\n\x15_source_code_location\x1aQ\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"z\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x42\x0c\n\n_file_nameB\x0e\n\x0c_line_number*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -59,10 +59,10 @@ _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_DATASETTYPE"]._serialized_start = 3014 - _globals["_DATASETTYPE"]._serialized_end = 3111 + _globals["_DATASETTYPE"]._serialized_start = 3368 + _globals["_DATASETTYPE"]._serialized_end = 3465 _globals["_PIPELINECOMMAND"]._serialized_start = 140 - _globals["_PIPELINECOMMAND"]._serialized_end = 2546 + _globals["_PIPELINECOMMAND"]._serialized_end = 2776 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 719 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1110 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 928 @@ -72,23 +72,25 @@ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1112 _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1202 _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_start = 1205 - _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 1798 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1642 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 1708 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 1801 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2245 + _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 1913 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1732 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 1798 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 1916 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2475 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 928 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 986 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2247 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2328 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2331 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2530 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2549 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 2819 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2706 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 2804 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 2821 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 2894 - _globals["_PIPELINEEVENT"]._serialized_start = 2896 - _globals["_PIPELINEEVENT"]._serialized_end = 3012 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2477 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2558 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2561 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2760 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2779 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 3049 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2936 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 3034 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 3051 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 3124 + _globals["_PIPELINEEVENT"]._serialized_start = 3126 + _globals["_PIPELINEEVENT"]._serialized_end = 3242 + _globals["_SOURCECODELOCATION"]._serialized_start = 3244 + _globals["_SOURCECODELOCATION"]._serialized_end = 3366 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index cf2cb8d3053b7..42387297f1a09 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -266,6 +266,7 @@ class PipelineCommand(google.protobuf.message.Message): PARTITION_COLS_FIELD_NUMBER: builtins.int SCHEMA_FIELD_NUMBER: builtins.int FORMAT_FIELD_NUMBER: builtins.int + SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this dataset to.""" dataset_name: builtins.str @@ -293,6 +294,9 @@ class PipelineCommand(google.protobuf.message.Message): """The output table format of the dataset. Only applies to dataset_type == TABLE and dataset_type == MATERIALIZED_VIEW. """ + @property + def source_code_location(self) -> global___SourceCodeLocation: + """The location in source code that this dataset was defined.""" def __init__( self, *, @@ -304,6 +308,7 @@ class PipelineCommand(google.protobuf.message.Message): partition_cols: collections.abc.Iterable[builtins.str] | None = ..., schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., format: builtins.str | None = ..., + source_code_location: global___SourceCodeLocation | None = ..., ) -> None: ... def HasField( self, @@ -320,6 +325,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_format", "_schema", b"_schema", + "_source_code_location", + b"_source_code_location", "comment", b"comment", "dataflow_graph_id", @@ -332,6 +339,8 @@ class PipelineCommand(google.protobuf.message.Message): b"format", "schema", b"schema", + "source_code_location", + b"source_code_location", ], ) -> builtins.bool: ... def ClearField( @@ -349,6 +358,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_format", "_schema", b"_schema", + "_source_code_location", + b"_source_code_location", "comment", b"comment", "dataflow_graph_id", @@ -363,6 +374,8 @@ class PipelineCommand(google.protobuf.message.Message): b"partition_cols", "schema", b"schema", + "source_code_location", + b"source_code_location", "table_properties", b"table_properties", ], @@ -392,6 +405,13 @@ class PipelineCommand(google.protobuf.message.Message): def WhichOneof( self, oneof_group: typing_extensions.Literal["_schema", b"_schema"] ) -> typing_extensions.Literal["schema"] | None: ... + @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_source_code_location", b"_source_code_location" + ], + ) -> typing_extensions.Literal["source_code_location"] | None: ... class DefineFlow(google.protobuf.message.Message): """Request to define a flow targeting a dataset.""" @@ -421,6 +441,7 @@ class PipelineCommand(google.protobuf.message.Message): PLAN_FIELD_NUMBER: builtins.int SQL_CONF_FIELD_NUMBER: builtins.int ONCE_FIELD_NUMBER: builtins.int + SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" flow_name: builtins.str @@ -437,6 +458,9 @@ class PipelineCommand(google.protobuf.message.Message): """SQL configurations set when running this flow.""" once: builtins.bool """If true, this flow will only be run once per full refresh.""" + @property + def source_code_location(self) -> global___SourceCodeLocation: + """The location in source code that this flow was defined.""" def __init__( self, *, @@ -446,6 +470,7 @@ class PipelineCommand(google.protobuf.message.Message): plan: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., sql_conf: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., once: builtins.bool | None = ..., + source_code_location: global___SourceCodeLocation | None = ..., ) -> None: ... def HasField( self, @@ -458,6 +483,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_once", "_plan", b"_plan", + "_source_code_location", + b"_source_code_location", "_target_dataset_name", b"_target_dataset_name", "dataflow_graph_id", @@ -468,6 +495,8 @@ class PipelineCommand(google.protobuf.message.Message): b"once", "plan", b"plan", + "source_code_location", + b"source_code_location", "target_dataset_name", b"target_dataset_name", ], @@ -483,6 +512,8 @@ class PipelineCommand(google.protobuf.message.Message): b"_once", "_plan", b"_plan", + "_source_code_location", + b"_source_code_location", "_target_dataset_name", b"_target_dataset_name", "dataflow_graph_id", @@ -493,6 +524,8 @@ class PipelineCommand(google.protobuf.message.Message): b"once", "plan", b"plan", + "source_code_location", + b"source_code_location", "sql_conf", b"sql_conf", "target_dataset_name", @@ -517,6 +550,13 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_plan", b"_plan"] ) -> typing_extensions.Literal["plan"] | None: ... @typing.overload + def WhichOneof( + self, + oneof_group: typing_extensions.Literal[ + "_source_code_location", b"_source_code_location" + ], + ) -> typing_extensions.Literal["source_code_location"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_target_dataset_name", b"_target_dataset_name"], @@ -838,3 +878,57 @@ class PipelineEvent(google.protobuf.message.Message): ) -> typing_extensions.Literal["message"] | None: ... global___PipelineEvent = PipelineEvent + +class SourceCodeLocation(google.protobuf.message.Message): + """Source code location information associated with a particular dataset or flow.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + FILE_NAME_FIELD_NUMBER: builtins.int + LINE_NUMBER_FIELD_NUMBER: builtins.int + file_name: builtins.str + """The file that this pipeline source code was defined in.""" + line_number: builtins.int + """The specific line number that this pipeline source code is located at, if applicable.""" + def __init__( + self, + *, + file_name: builtins.str | None = ..., + line_number: builtins.int | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_file_name", + b"_file_name", + "_line_number", + b"_line_number", + "file_name", + b"file_name", + "line_number", + b"line_number", + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "_file_name", + b"_file_name", + "_line_number", + b"_line_number", + "file_name", + b"file_name", + "line_number", + b"line_number", + ], + ) -> None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_file_name", b"_file_name"] + ) -> typing_extensions.Literal["file_name"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_line_number", b"_line_number"] + ) -> typing_extensions.Literal["line_number"] | None: ... + +global___SourceCodeLocation = SourceCodeLocation diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index f4f1d3b043d33..5279e046a6cb4 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -88,6 +88,9 @@ message PipelineCommand { // The output table format of the dataset. Only applies to dataset_type == TABLE and // dataset_type == MATERIALIZED_VIEW. optional string format = 8; + + // The location in source code that this dataset was defined. + optional SourceCodeLocation source_code_location = 9; } // Request to define a flow targeting a dataset. @@ -109,6 +112,9 @@ message PipelineCommand { // If true, this flow will only be run once per full refresh. optional bool once = 6; + + // The location in source code that this flow was defined. + optional SourceCodeLocation source_code_location = 7; } // Resolves all datasets and flows and start a pipeline update. Should be called after all @@ -166,3 +172,11 @@ message PipelineEvent { // The message that should be displayed to users. optional string message = 2; } + +// Source code location information associated with a particular dataset or flow. +message SourceCodeLocation { + // The file that this pipeline source code was defined in. + optional string file_name = 1; + // The specific line number that this pipeline source code is located at, if applicable. + optional int32 line_number = 2; +} diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 92cb5bcac4ee0..a1bcfa9dba498 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -156,6 +156,10 @@ private[connect] object PipelinesHandler extends Logging { .filter(_.nonEmpty), properties = dataset.getTablePropertiesMap.asScala.toMap, baseOrigin = QueryOrigin( + filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( + dataset.getSourceCodeLocation.getFileName), + line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)( + dataset.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.Table.toString), objectName = Option(tableIdentifier.unquotedString), language = Option(Python())), @@ -171,6 +175,10 @@ private[connect] object PipelinesHandler extends Logging { identifier = viewIdentifier, comment = Option(dataset.getComment), origin = QueryOrigin( + filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( + dataset.getSourceCodeLocation.getFileName), + line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)( + dataset.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.View.toString), objectName = Option(viewIdentifier.unquotedString), language = Option(Python())), @@ -214,6 +222,10 @@ private[connect] object PipelinesHandler extends Logging { Option(graphElementRegistry.defaultDatabase)), comment = None, origin = QueryOrigin( + filePath = Option.when(flow.getSourceCodeLocation.hasFileName)( + flow.getSourceCodeLocation.getFileName), + line = Option.when(flow.getSourceCodeLocation.hasLineNumber)( + flow.getSourceCodeLocation.getLineNumber), objectType = Option(QueryOriginType.Flow.toString), objectName = Option(flowIdentifier.unquotedString), language = Option(Python())))) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 21f2857090182..70550c0ba065d 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -28,7 +28,11 @@ import scala.util.Try import org.apache.spark.api.python.PythonUtils import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.pipelines.graph.DataflowGraph +import org.apache.spark.sql.pipelines.Language.Python +import org.apache.spark.sql.pipelines.QueryOriginType +import org.apache.spark.sql.pipelines.common.FlowStatus +import org.apache.spark.sql.pipelines.graph.{DataflowGraph, QueryOrigin} +import org.apache.spark.sql.pipelines.logging.EventLevel import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, TestPipelineUpdateContextMixin} /** @@ -101,6 +105,137 @@ class PythonPipelineSuite assert(graph.tables.size == 1) } + test("failed flow progress event has correct python source code location") { + // Note that pythonText will be inserted into line 26 of the python script that is run. + val unresolvedGraph = buildGraph(pythonText = """ + |@sdp.table() + |def table1(): + | df = spark.createDataFrame([(25,), (30,), (45,)], ["age"]) + | return df.select("name") + |""".stripMargin) + + val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph) + updateContext.pipelineExecution.runPipeline() + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("table1"), + expectedFlowStatus = FlowStatus.FAILED, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(27), + objectName = Option("spark_catalog.default.table1"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + errorChecker = ex => ex.getMessage.contains( + "A column, variable, or function parameter with name `name` cannot be resolved."), + expectedEventLevel = EventLevel.WARN + ) + } + + test("flow progress events have correct python source code location") { + val unresolvedGraph = buildGraph(pythonText = """ + |@sdp.table( + | comment = 'my table' + |) + |def table1(): + | return spark.readStream.table('mv') + | + |@sdp.materialized_view + |def mv2(): + | return spark.range(26, 29) + | + |@sdp.materialized_view + |def mv(): + | df = spark.createDataFrame([(25,), (30,), (45,)], ["age"]) + | return df.select("age") + | + |@sdp.append_flow( + | target = 'table1' + |) + |def standalone_flow1(): + | return spark.readStream.table('mv2') + |""".stripMargin) + + val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph) + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + Seq(FlowStatus.QUEUED, FlowStatus.STARTING, + FlowStatus.PLANNING, FlowStatus.RUNNING, FlowStatus.COMPLETED).foreach { flowStatus => + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("mv2"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(33), + objectName = Option("spark_catalog.default.mv2"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + expectedEventLevel = EventLevel.INFO + ) + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("mv"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(37), + objectName = Option("spark_catalog.default.mv"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + expectedEventLevel = EventLevel.INFO + ) + } + + // Note that streaming flows do not have a PLANNING phase. + Seq(FlowStatus.QUEUED, FlowStatus.STARTING, FlowStatus.RUNNING, FlowStatus.COMPLETED).foreach { + flowStatus => + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("table1"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(27), + objectName = Option("spark_catalog.default.table1"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + expectedEventLevel = EventLevel.INFO + ) + + assertFlowProgressEvent( + updateContext.eventBuffer, + identifier = graphIdentifier("standalone_flow1"), + expectedFlowStatus = flowStatus, + cond = flowProgressEvent => flowProgressEvent.origin.sourceCodeLocation == Option( + QueryOrigin( + language = Option(Python()), + filePath = Option(""), + line = Option(42), + objectName = Option("spark_catalog.default.standalone_flow1"), + objectType = Option(QueryOriginType.Flow.toString) + ) + ), + expectedEventLevel = EventLevel.INFO + ) + } + } + test("basic with inverted topological order") { // This graph is purposefully in the wrong topological order to test the topological sort val graph = buildGraph(""" diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala index d33924c2e1c37..a1b029b653842 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala @@ -129,7 +129,8 @@ private class FlowResolver(rawGraph: DataflowGraph) { allInputs = allInputs, availableInputs = availableResolvedInputs.values.toList, configuration = flowToResolve.sqlConf, - queryContext = flowToResolve.queryContext + queryContext = flowToResolve.queryContext, + queryOrigin = flowToResolve.origin ) val result = flowFunctionResult match { @@ -175,7 +176,8 @@ private class FlowResolver(rawGraph: DataflowGraph) { allInputs = allInputs, availableInputs = availableResolvedInputs.values.toList, configuration = newSqlConf, - queryContext = flowToResolve.queryContext + queryContext = flowToResolve.queryContext, + queryOrigin = flowToResolve.origin ) } else { f diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala index 2378b6f8d96a6..0948c100b86b1 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala @@ -75,13 +75,16 @@ trait FlowFunction extends Logging { * @param availableInputs the list of all [[Input]]s available to this flow * @param configuration the spark configurations that apply to this flow. * @param queryContext The context of the query being evaluated. + * @param queryOrigin The source code location of the flow definition this flow function was + * instantiated from. * @return the inputs actually used, and the DataFrame expression for the flow */ def call( allInputs: Set[TableIdentifier], availableInputs: Seq[Input], configuration: Map[String, String], - queryContext: QueryContext + queryContext: QueryContext, + queryOrigin: QueryOrigin ): FlowFunctionResult } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala index 7e2e97f2b5d74..984e7d3925b49 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala @@ -46,13 +46,15 @@ object FlowAnalysis { allInputs: Set[TableIdentifier], availableInputs: Seq[Input], confs: Map[String, String], - queryContext: QueryContext + queryContext: QueryContext, + queryOrigin: QueryOrigin ): FlowFunctionResult = { val ctx = FlowAnalysisContext( allInputs = allInputs, availableInputs = availableInputs, queryContext = queryContext, - spark = SparkSession.active + spark = SparkSession.active, + queryOrigin = queryOrigin ) val df = try { confs.foreach { case (k, v) => ctx.setConf(k, v) } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala index 1139946df59ac..fa8689e5ad608 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala @@ -46,7 +46,8 @@ private[pipelines] case class FlowAnalysisContext( shouldLowerCaseNames: Boolean = false, analysisWarnings: mutable.Buffer[AnalysisWarning] = new ListBuffer[AnalysisWarning], spark: SparkSession, - externalInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty + externalInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty, + queryOrigin: QueryOrigin ) { /** Map from `Input` name to the actual `Input` */ diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala index 0e2ba42b15e59..1ad327b1e6c38 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala @@ -49,35 +49,54 @@ class GraphRegistrationContext( flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf) } + /** + * Collects all graph registered entities (tables, views, flows) into a DataflowGraph object, + * for graph analysis/resolution/execution. Also responsible for fully qualifying any partially + * qualified user-specified identifiers using the pipeline-level catalog and database. If + * identifiers have already been qualified during graph element registration, that qualification + * is respected. + */ def toDataflowGraph: DataflowGraph = { val qualifiedTables = tables.toSeq.map { t => + val fullyQualifiedTableIdentifier = GraphIdentifierManager + .parseAndQualifyTableIdentifier( + rawTableIdentifier = t.identifier, + currentCatalog = Some(defaultCatalog), + currentDatabase = Some(defaultDatabase) + ) + .identifier t.copy( - identifier = GraphIdentifierManager - .parseAndQualifyTableIdentifier( - rawTableIdentifier = t.identifier, - currentCatalog = Some(defaultCatalog), - currentDatabase = Some(defaultDatabase) - ) - .identifier + identifier = fullyQualifiedTableIdentifier, + baseOrigin = t.baseOrigin.copy( + objectName = Option(fullyQualifiedTableIdentifier.unquotedString) + ) ) } val validatedViews = views.toSeq.collect { case v: TemporaryView => + val parsedAndValidatedTemporaryViewIdentifier = GraphIdentifierManager + .parseAndValidateTemporaryViewIdentifier( + rawViewIdentifier = v.identifier + ) v.copy( - identifier = GraphIdentifierManager - .parseAndValidateTemporaryViewIdentifier( - rawViewIdentifier = v.identifier - ) + identifier = parsedAndValidatedTemporaryViewIdentifier, + origin = v.origin.copy( + objectName = Option(parsedAndValidatedTemporaryViewIdentifier.unquotedString) + ) ) case v: PersistedView => + val fullyQualifiedPersistedViewIdentifier = GraphIdentifierManager + .parseAndValidatePersistedViewIdentifier( + rawViewIdentifier = v.identifier, + currentCatalog = Some(defaultCatalog), + currentDatabase = Some(defaultDatabase) + ) v.copy( - identifier = GraphIdentifierManager - .parseAndValidatePersistedViewIdentifier( - rawViewIdentifier = v.identifier, - currentCatalog = Some(defaultCatalog), - currentDatabase = Some(defaultDatabase) - ) + identifier = fullyQualifiedPersistedViewIdentifier, + origin = v.origin.copy( + objectName = Option(fullyQualifiedPersistedViewIdentifier.unquotedString) + ) ) } @@ -94,21 +113,25 @@ class GraphRegistrationContext( if (isImplicitFlow && flowWritesToView) { f } else { + val fullyQualifiedFlowIdentifier = GraphIdentifierManager + .parseAndQualifyFlowIdentifier( + rawFlowIdentifier = f.identifier, + currentCatalog = Some(defaultCatalog), + currentDatabase = Some(defaultDatabase) + ) + .identifier f.copy( - identifier = GraphIdentifierManager - .parseAndQualifyFlowIdentifier( - rawFlowIdentifier = f.identifier, - currentCatalog = Some(defaultCatalog), - currentDatabase = Some(defaultDatabase) - ) - .identifier, + identifier = fullyQualifiedFlowIdentifier, destinationIdentifier = GraphIdentifierManager .parseAndQualifyFlowIdentifier( rawFlowIdentifier = f.destinationIdentifier, currentCatalog = Some(defaultCatalog), currentDatabase = Some(defaultDatabase) ) - .identifier + .identifier, + origin = f.origin.copy( + objectName = Option(fullyQualifiedFlowIdentifier.unquotedString) + ) ) } }