From dab999a3ecf564f53a5246572d5f3c4c1a7abce2 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 12:16:38 +0530 Subject: [PATCH 1/8] Fixed issue --- dbt/adapters/spark/impl.py | 116 ++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 59 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index d33ebde20..f3e7f6cab 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -171,31 +171,6 @@ def _get_relation_information(self, row: "agate.Row") -> RelationInfo: return _schema, name, information - def _get_relation_information_using_describe(self, row: "agate.Row") -> RelationInfo: - """Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement""" - try: - _schema, name, _ = row - except ValueError: - raise DbtRuntimeError( - f'Invalid value from "show tables ...", got {len(row)} values, expected 3' - ) - - table_name = f"{_schema}.{name}" - try: - table_results = self.execute_macro( - DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs={"table_name": table_name} - ) - except DbtRuntimeError as e: - logger.debug(f"Error while retrieving information about {table_name}: {e.msg}") - table_results = AttrDict() - - information = "" - for info_row in table_results: - info_type, info_value, _ = info_row - if not info_type.startswith("#"): - information += f"{info_type}: {info_value}\n" - - return _schema, name, information def _build_spark_relation_list( self, @@ -230,48 +205,71 @@ def _build_spark_relation_list( def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]: """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must try different methods to fetch relation information.""" - - kwargs = {"schema_relation": schema_relation} - try: - # Default compute engine behavior: show tables extended - show_table_extended_rows = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) - return self._build_spark_relation_list( - row_list=show_table_extended_rows, - relation_info_func=self._get_relation_information, + # Iceberg behavior: 3-row result of relations obtained + show_table_rows = self.execute_macro( + LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs ) - except DbtRuntimeError as e: - errmsg = getattr(e, "msg", "") - if f"Database '{schema_relation}' not found" in errmsg: - return [] - # Iceberg compute engine behavior: show table - elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg: - # this happens with spark-iceberg with v2 iceberg tables - # https://issues.apache.org/jira/browse/SPARK-33393 - try: - # Iceberg behavior: 3-row result of relations obtained - show_table_rows = self.execute_macro( - LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs - ) - return self._build_spark_relation_list( - row_list=show_table_rows, - relation_info_func=self._get_relation_information_using_describe, - ) - except DbtRuntimeError as e: - description = "Error while retrieving information about" - logger.debug(f"{description} {schema_relation}: {e.msg}") - return [] - else: - logger.debug( - f"Error while retrieving information about {schema_relation}: {errmsg}" + for row in show_table_rows: + _schema, name, _ = row + information = "" + + rel_type: RelationType = ( + RelationType.View if "Type: VIEW" in information else RelationType.Table + ) + is_delta: bool = "Provider: delta" in information + is_hudi: bool = "Provider: hudi" in information + is_iceberg: bool = "Provider: iceberg" in information + + relation: BaseRelation = self.Relation.create( + schema=_schema, + identifier=name, + type=rel_type, + information=information, + is_delta=is_delta, + is_iceberg=is_iceberg, + is_hudi=is_hudi, ) - return [] + relations.append(relation) + return relations + except DbtRuntimeError as e: + description = "Error while retrieving information about" + logger.debug(f"{description} {schema_relation}: {e.msg}") + return [] + + def set_relation_information(self, relation: BaseRelation): + if relation.information: + return relation + rows: List[agate.Row] = super().get_columns_in_relations(relation) + information = "" + for info_row in rows: + info_type, info_value = info_row.values() + if not info_type.startswith("#"): + information += f"{info_type}: {info_value}\n" + rel_type: RelationType = ( + RelationType.View if "Type: VIEW" in information else RelationType.Table + ) + is_delta: bool = "Provider: delta" in information + is_hudi: bool = "Provider: hudi" in information + is_iceberg: bool = "Provider: iceberg" in information + relation: BaseRelation = self.Relation.create( + schema=_schema, + identifier=name, + type=rel_type, + information=information, + is_delta=is_delta, + is_iceberg=is_iceberg, + is_hudi=is_hudi, + ) + return relation + def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]: if not self.Relation.get_default_include_policy().database: database = None # type: ignore - return super().get_relation(database, schema, identifier) + relation = super().get_relation(database, schema, identifier) + self.set_relation_information(relation) if relation else None def parse_describe_extended( self, relation: BaseRelation, raw_rows: AttrDict From b8fc1d26e7d3d97d930edacf971c6b1a560bf090 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 12:22:52 +0530 Subject: [PATCH 2/8] Fixed issue --- dbt/adapters/spark/impl.py | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index f3e7f6cab..9398de7e8 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -172,36 +172,6 @@ def _get_relation_information(self, row: "agate.Row") -> RelationInfo: return _schema, name, information - def _build_spark_relation_list( - self, - row_list: "agate.Table", - relation_info_func: Callable[["agate.Row"], RelationInfo], - ) -> List[BaseRelation]: - """Aggregate relations with format metadata included.""" - relations = [] - for row in row_list: - _schema, name, information = relation_info_func(row) - - rel_type: RelationType = ( - RelationType.View if "Type: VIEW" in information else RelationType.Table - ) - is_delta: bool = "Provider: delta" in information - is_hudi: bool = "Provider: hudi" in information - is_iceberg: bool = "Provider: iceberg" in information - - relation: BaseRelation = self.Relation.create( - schema=_schema, - identifier=name, - type=rel_type, - information=information, - is_delta=is_delta, - is_iceberg=is_iceberg, - is_hudi=is_hudi, - ) - relations.append(relation) - - return relations - def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]: """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must try different methods to fetch relation information.""" From 716208a99b61648463563b53f2c4159f72bd0b96 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 12:31:21 +0530 Subject: [PATCH 3/8] Fixed sisue --- dbt/adapters/spark/impl.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 9398de7e8..192757aab 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -176,6 +176,7 @@ def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[ """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must try different methods to fetch relation information.""" try: + kwargs = {"schema_relation": schema_relation} # Iceberg behavior: 3-row result of relations obtained show_table_rows = self.execute_macro( LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs @@ -239,7 +240,7 @@ def get_relation(self, database: str, schema: str, identifier: str) -> Optional[ database = None # type: ignore relation = super().get_relation(database, schema, identifier) - self.set_relation_information(relation) if relation else None + self.set_relation_information(relation) if relation else None def parse_describe_extended( self, relation: BaseRelation, raw_rows: AttrDict From c33e13d19b751bfc80c0fd882a88f971f93505ea Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 12:37:47 +0530 Subject: [PATCH 4/8] Fixed sisue --- dbt/adapters/spark/impl.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 9398de7e8..192757aab 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -176,6 +176,7 @@ def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[ """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must try different methods to fetch relation information.""" try: + kwargs = {"schema_relation": schema_relation} # Iceberg behavior: 3-row result of relations obtained show_table_rows = self.execute_macro( LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs @@ -239,7 +240,7 @@ def get_relation(self, database: str, schema: str, identifier: str) -> Optional[ database = None # type: ignore relation = super().get_relation(database, schema, identifier) - self.set_relation_information(relation) if relation else None + self.set_relation_information(relation) if relation else None def parse_describe_extended( self, relation: BaseRelation, raw_rows: AttrDict From f5bd5baf10fe8f5a8b19349d35a7079aca4b5cad Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 12:54:16 +0530 Subject: [PATCH 5/8] Fixed issue --- dbt/adapters/spark/impl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 192757aab..ab95e85f2 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -176,6 +176,7 @@ def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[ """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must try different methods to fetch relation information.""" try: + relations = [] kwargs = {"schema_relation": schema_relation} # Iceberg behavior: 3-row result of relations obtained show_table_rows = self.execute_macro( From 0060275172499f2d86fc917a341c5a878f0bb04f Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 16:26:16 +0530 Subject: [PATCH 6/8] Fixed defect --- dbt/adapters/spark/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index ab95e85f2..8e823664d 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -212,7 +212,7 @@ def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[ def set_relation_information(self, relation: BaseRelation): if relation.information: return relation - rows: List[agate.Row] = super().get_columns_in_relations(relation) + rows: List[agate.Row] = super().get_columns_in_relation(relation) information = "" for info_row in rows: info_type, info_value = info_row.values() From 7046a13c93d54f335c10796b67881663d74d6f3a Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 16:59:26 +0530 Subject: [PATCH 7/8] Fixed issue --- dbt/adapters/spark/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 8e823664d..bddb7e7dd 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -215,7 +215,7 @@ def set_relation_information(self, relation: BaseRelation): rows: List[agate.Row] = super().get_columns_in_relation(relation) information = "" for info_row in rows: - info_type, info_value = info_row.values() + info_type, info_value, _ = info_row.values() if not info_type.startswith("#"): information += f"{info_type}: {info_value}\n" rel_type: RelationType = ( From 5df3ffe2a79a1852335a118968a9033ec4870879 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 17:12:45 +0530 Subject: [PATCH 8/8] Fixed issue --- dbt/adapters/spark/impl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index bddb7e7dd..1dc4b09c3 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -225,8 +225,8 @@ def set_relation_information(self, relation: BaseRelation): is_hudi: bool = "Provider: hudi" in information is_iceberg: bool = "Provider: iceberg" in information relation: BaseRelation = self.Relation.create( - schema=_schema, - identifier=name, + schema=relation.schema, + identifier=relation.identifier, type=rel_type, information=information, is_delta=is_delta,