6
6
7
7
from pyhocon import ConfigFactory , ConfigTree
8
8
from typing import Iterator , Union , Dict , Any
9
+ from sqlalchemy .engine .url import make_url
9
10
10
11
from databuilder import Scoped
11
12
from databuilder .extractor .table_metadata_constants import PARTITION_BADGE
@@ -56,6 +57,34 @@ class HiveTableMetadataExtractor(Extractor):
56
57
ORDER by tbl_id, is_partition_col desc;
57
58
"""
58
59
60
+ DEFAULT_POSTGRES_SQL_STATEMENT = """
61
+ SELECT source.* FROM
62
+ (SELECT t."TBL_ID" as tbl_id, d."NAME" as "schema", t."TBL_NAME" as name, t."TBL_TYPE",
63
+ tp."PARAM_VALUE" as description, p."PKEY_NAME" as col_name, p."INTEGER_IDX" as col_sort_order,
64
+ p."PKEY_TYPE" as col_type, p."PKEY_COMMENT" as col_description, 1 as "is_partition_col",
65
+ CASE WHEN t."TBL_TYPE" = 'VIRTUAL_VIEW' THEN 1
66
+ ELSE 0 END as "is_view"
67
+ FROM "TBLS" t
68
+ JOIN "DBS" d ON t."DB_ID" = d."DB_ID"
69
+ JOIN "PARTITION_KEYS" p ON t."TBL_ID" = p."TBL_ID"
70
+ LEFT JOIN "TABLE_PARAMS" tp ON (t."TBL_ID" = tp."TBL_ID" AND tp."PARAM_KEY"='comment')
71
+ {where_clause_suffix}
72
+ UNION
73
+ SELECT t."TBL_ID" as tbl_id, d."NAME" as "schema", t."TBL_NAME" as name, t."TBL_TYPE",
74
+ tp."PARAM_VALUE" as description, c."COLUMN_NAME" as col_name, c."INTEGER_IDX" as col_sort_order,
75
+ c."TYPE_NAME" as col_type, c."COMMENT" as col_description, 0 as "is_partition_col",
76
+ CASE WHEN t."TBL_TYPE" = 'VIRTUAL_VIEW' THEN 1
77
+ ELSE 0 END as "is_view"
78
+ FROM "TBLS" t
79
+ JOIN "DBS" d ON t."DB_ID" = d."DB_ID"
80
+ JOIN "SDS" s ON t."SD_ID" = s."SD_ID"
81
+ JOIN "COLUMNS_V2" c ON s."CD_ID" = c."CD_ID"
82
+ LEFT JOIN "TABLE_PARAMS" tp ON (t."TBL_ID" = tp."TBL_ID" AND tp."PARAM_KEY"='comment')
83
+ {where_clause_suffix}
84
+ ) source
85
+ ORDER by tbl_id, is_partition_col desc;
86
+ """
87
+
59
88
# CONFIG KEYS
60
89
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
61
90
CLUSTER_KEY = 'cluster'
@@ -67,20 +96,28 @@ def init(self, conf: ConfigTree) -> None:
67
96
conf = conf .with_fallback (HiveTableMetadataExtractor .DEFAULT_CONFIG )
68
97
self ._cluster = '{}' .format (conf .get_string (HiveTableMetadataExtractor .CLUSTER_KEY ))
69
98
70
- default_sql = HiveTableMetadataExtractor .DEFAULT_SQL_STATEMENT .format (
99
+ self ._alchemy_extractor = SQLAlchemyExtractor ()
100
+
101
+ sql_alch_conf = Scoped .get_scoped_conf (conf , self ._alchemy_extractor .get_scope ())
102
+ default_sql = self ._choose_default_sql_stm (sql_alch_conf ).format (
71
103
where_clause_suffix = conf .get_string (HiveTableMetadataExtractor .WHERE_CLAUSE_SUFFIX_KEY ))
72
104
73
105
self .sql_stmt = conf .get_string (HiveTableMetadataExtractor .EXTRACT_SQL , default = default_sql )
74
106
75
107
LOGGER .info ('SQL for hive metastore: {}' .format (self .sql_stmt ))
76
108
77
- self ._alchemy_extractor = SQLAlchemyExtractor ()
78
- sql_alch_conf = Scoped .get_scoped_conf (conf , self ._alchemy_extractor .get_scope ())\
79
- .with_fallback (ConfigFactory .from_dict ({SQLAlchemyExtractor .EXTRACT_SQL : self .sql_stmt }))
80
-
109
+ sql_alch_conf = sql_alch_conf .with_fallback (ConfigFactory .from_dict (
110
+ {SQLAlchemyExtractor .EXTRACT_SQL : self .sql_stmt }))
81
111
self ._alchemy_extractor .init (sql_alch_conf )
82
112
self ._extract_iter : Union [None , Iterator ] = None
83
113
114
+ def _choose_default_sql_stm (self , conf : ConfigTree ) -> str :
115
+ url = make_url (conf .get_string (SQLAlchemyExtractor .CONN_STRING ))
116
+ if url .drivername .lower () in ['postgresql' , 'postgres' ]:
117
+ return HiveTableMetadataExtractor .DEFAULT_POSTGRES_SQL_STATEMENT
118
+ else :
119
+ return HiveTableMetadataExtractor .DEFAULT_SQL_STATEMENT
120
+
84
121
def extract (self ) -> Union [TableMetadata , None ]:
85
122
if not self ._extract_iter :
86
123
self ._extract_iter = self ._get_extract_iter ()
0 commit comments