From 107769d4ab3f70518f2d7f859924687ec6c44acd Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 17:26:05 +0530 Subject: [PATCH 1/4] Fixed issue --- dbt/adapters/spark/impl.py | 148 +++++++++++++++---------------------- 1 file changed, 59 insertions(+), 89 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index d33ebde20..4d1434cf7 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -171,107 +171,77 @@ def _get_relation_information(self, row: "agate.Row") -> RelationInfo: return _schema, name, information - def _get_relation_information_using_describe(self, row: "agate.Row") -> RelationInfo: - """Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement""" - try: - _schema, name, _ = row - except ValueError: - raise DbtRuntimeError( - f'Invalid value from "show tables ...", got {len(row)} values, expected 3' - ) - table_name = f"{_schema}.{name}" + def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]: + """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must + try different methods to fetch relation information.""" try: - table_results = self.execute_macro( - DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs={"table_name": table_name} + relations = [] + kwargs = {"schema_relation": schema_relation} + # Iceberg behavior: 3-row result of relations obtained + show_table_rows = self.execute_macro( + LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs ) - except DbtRuntimeError as e: - logger.debug(f"Error while retrieving information about {table_name}: {e.msg}") - table_results = AttrDict() + for row in show_table_rows: + _schema, name, _ = row + information = "" + rel_type: RelationType = ( + RelationType.View if "Type: VIEW" in information else RelationType.Table + ) + is_delta: bool = "Provider: delta" in information + is_hudi: bool = "Provider: hudi" in information + is_iceberg: bool = "Provider: iceberg" in information + + relation: BaseRelation = self.Relation.create( + schema=_schema, + identifier=name, + type=rel_type, + information=information, + is_delta=is_delta, + is_iceberg=is_iceberg, + is_hudi=is_hudi, + ) + relations.append(relation) + return relations + except DbtRuntimeError as e: + description = "Error while retrieving information about" + logger.debug(f"{description} {schema_relation}: {e.msg}") + return [] + + def set_relation_information(self, relation: BaseRelation): + if relation.information: + return relation + rows: List[agate.Row] = super().get_columns_in_relation(relation) information = "" - for info_row in table_results: - info_type, info_value, _ = info_row + for info_row in rows: + info_type, info_value, _ = info_row.values() if not info_type.startswith("#"): information += f"{info_type}: {info_value}\n" + rel_type: RelationType = ( + RelationType.View if "Type: VIEW" in information else RelationType.Table + ) + is_delta: bool = "Provider: delta" in information + is_hudi: bool = "Provider: hudi" in information + is_iceberg: bool = "Provider: iceberg" in information + relation: BaseRelation = self.Relation.create( + schema=relation.schema, + identifier=relation.identifier, + type=rel_type, + information=information, + is_delta=is_delta, + is_iceberg=is_iceberg, + is_hudi=is_hudi, + ) + return relation - return _schema, name, information - - def _build_spark_relation_list( - self, - row_list: "agate.Table", - relation_info_func: Callable[["agate.Row"], RelationInfo], - ) -> List[BaseRelation]: - """Aggregate relations with format metadata included.""" - relations = [] - for row in row_list: - _schema, name, information = relation_info_func(row) - - rel_type: RelationType = ( - RelationType.View if "Type: VIEW" in information else RelationType.Table - ) - is_delta: bool = "Provider: delta" in information - is_hudi: bool = "Provider: hudi" in information - is_iceberg: bool = "Provider: iceberg" in information - - relation: BaseRelation = self.Relation.create( - schema=_schema, - identifier=name, - type=rel_type, - information=information, - is_delta=is_delta, - is_iceberg=is_iceberg, - is_hudi=is_hudi, - ) - relations.append(relation) - - return relations - - def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]: - """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must - try different methods to fetch relation information.""" - - kwargs = {"schema_relation": schema_relation} - - try: - # Default compute engine behavior: show tables extended - show_table_extended_rows = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) - return self._build_spark_relation_list( - row_list=show_table_extended_rows, - relation_info_func=self._get_relation_information, - ) - except DbtRuntimeError as e: - errmsg = getattr(e, "msg", "") - if f"Database '{schema_relation}' not found" in errmsg: - return [] - # Iceberg compute engine behavior: show table - elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg: - # this happens with spark-iceberg with v2 iceberg tables - # https://issues.apache.org/jira/browse/SPARK-33393 - try: - # Iceberg behavior: 3-row result of relations obtained - show_table_rows = self.execute_macro( - LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs - ) - return self._build_spark_relation_list( - row_list=show_table_rows, - relation_info_func=self._get_relation_information_using_describe, - ) - except DbtRuntimeError as e: - description = "Error while retrieving information about" - logger.debug(f"{description} {schema_relation}: {e.msg}") - return [] - else: - logger.debug( - f"Error while retrieving information about {schema_relation}: {errmsg}" - ) - return [] def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]: if not self.Relation.get_default_include_policy().database: database = None # type: ignore - return super().get_relation(database, schema, identifier) + relation = super().get_relation(database, schema, identifier) + self.set_relation_information(relation) if relation else None def parse_describe_extended( self, relation: BaseRelation, raw_rows: AttrDict @@ -549,4 +519,4 @@ def debug_query(self) -> None: diff_count.num_missing as num_mismatched from row_count_diff cross join diff_count -""".strip() +""".strip() \ No newline at end of file From 92ba3bf69d9f40f0c95323567cd3837612b6055c Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 17:34:21 +0530 Subject: [PATCH 2/4] Fixed issue --- dbt/adapters/spark/impl.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 4d1434cf7..adb7d5b51 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -171,7 +171,6 @@ def _get_relation_information(self, row: "agate.Row") -> RelationInfo: return _schema, name, information - def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]: """Distinct Spark compute engines may not support the same SQL featureset. Thus, we must try different methods to fetch relation information.""" @@ -209,7 +208,7 @@ def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[ logger.debug(f"{description} {schema_relation}: {e.msg}") return [] - def set_relation_information(self, relation: BaseRelation): + def set_relation_information(self, relation: BaseRelation) -> BaseRelation: if relation.information: return relation rows: List[agate.Row] = super().get_columns_in_relation(relation) @@ -224,7 +223,7 @@ def set_relation_information(self, relation: BaseRelation): is_delta: bool = "Provider: delta" in information is_hudi: bool = "Provider: hudi" in information is_iceberg: bool = "Provider: iceberg" in information - relation: BaseRelation = self.Relation.create( + updated_relation: BaseRelation = self.Relation.create( schema=relation.schema, identifier=relation.identifier, type=rel_type, @@ -233,15 +232,14 @@ def set_relation_information(self, relation: BaseRelation): is_iceberg=is_iceberg, is_hudi=is_hudi, ) - return relation - + return updated_relation def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]: if not self.Relation.get_default_include_policy().database: database = None # type: ignore relation = super().get_relation(database, schema, identifier) - self.set_relation_information(relation) if relation else None + return self.set_relation_information(relation) if relation else None def parse_describe_extended( self, relation: BaseRelation, raw_rows: AttrDict @@ -519,4 +517,4 @@ def debug_query(self) -> None: diff_count.num_missing as num_mismatched from row_count_diff cross join diff_count -""".strip() \ No newline at end of file +""".strip() From 81877b8dd4b7f403a0cbe452794def7def94bfe2 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 17:37:51 +0530 Subject: [PATCH 3/4] Fixed issue --- dbt/adapters/spark/impl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index adb7d5b51..fc7e66aaa 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -11,7 +11,6 @@ Union, Type, Tuple, - Callable, Set, FrozenSet, TYPE_CHECKING, From 3d46cead8f8fd969f471408c2114c5b8c2d12502 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Mon, 9 Dec 2024 18:32:00 +0530 Subject: [PATCH 4/4] Fixed issue --- dbt/adapters/spark/impl.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index fc7e66aaa..2708a68f8 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -207,15 +207,30 @@ def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[ logger.debug(f"{description} {schema_relation}: {e.msg}") return [] - def set_relation_information(self, relation: BaseRelation) -> BaseRelation: - if relation.information: - return relation - rows: List[agate.Row] = super().get_columns_in_relation(relation) + def _get_relation_information_using_describe(self, relation: BaseRelation) -> RelationInfo: + """Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement""" + _schema = relation.schema + name = relation.identifier + table_name = f"{_schema}.{name}" + try: + table_results = self.execute_macro( + DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs={"table_name": table_name} + ) + except DbtRuntimeError as e: + logger.debug(f"Error while retrieving information about {table_name}: {e.msg}") + table_results = AttrDict() + information = "" - for info_row in rows: - info_type, info_value, _ = info_row.values() + for info_row in table_results: + info_type, info_value, _ = info_row if not info_type.startswith("#"): information += f"{info_type}: {info_value}\n" + return _schema, name, information + + def set_relation_information(self, relation: BaseRelation) -> BaseRelation: + if relation.information: + return relation + _schema, name, information = self._get_relation_information_using_describe(relation) rel_type: RelationType = ( RelationType.View if "Type: VIEW" in information else RelationType.Table ) @@ -223,8 +238,8 @@ def set_relation_information(self, relation: BaseRelation) -> BaseRelation: is_hudi: bool = "Provider: hudi" in information is_iceberg: bool = "Provider: iceberg" in information updated_relation: BaseRelation = self.Relation.create( - schema=relation.schema, - identifier=relation.identifier, + schema=_schema, + identifier=name, type=rel_type, information=information, is_delta=is_delta,