1
1
import logging
2
+ from multiprocessing .pool import ThreadPool , TimeoutError
2
3
3
4
from pyhocon import ConfigTree # noqa: F401
4
5
from typing import Any , Optional , List , Iterable # noqa: F401
@@ -20,12 +21,17 @@ class SqlToTblColUsageTransformer(Transformer):
20
21
Currently it's collects on table level that column on same table will be de-duped.
21
22
In many cases, "from" clause does not contain schema and this will be fetched via table name -> schema name mapping
22
23
which it gets from Hive metastore. (Naming collision is disregarded as it needs column level to disambiguate)
24
+
25
+ Currently, ColumnUsageProvider could hang on certain SQL statement and as a short term solution it will timeout
26
+ processing statement at 10 seconds.
23
27
"""
24
28
# Config key
25
29
DATABASE_NAME = 'database'
26
30
CLUSTER_NAME = 'cluster'
27
31
SQL_STATEMENT_ATTRIBUTE_NAME = 'sql_stmt_attribute_name'
28
32
USER_EMAIL_ATTRIBUTE_NAME = 'user_email_attribute_name'
33
+ COLUMN_EXTRACTION_TIMEOUT_SEC = 'column_extraction_timeout_seconds'
34
+ LOG_ALL_EXTRACTION_FAILURES = 'log_all_extraction_failures'
29
35
30
36
total_counts = 0
31
37
failure_counts = 0
@@ -38,6 +44,11 @@ def init(self, conf):
38
44
self ._sql_stmt_attr = conf .get_string (SqlToTblColUsageTransformer .SQL_STATEMENT_ATTRIBUTE_NAME )
39
45
self ._user_email_attr = conf .get_string (SqlToTblColUsageTransformer .USER_EMAIL_ATTRIBUTE_NAME )
40
46
self ._tbl_to_schema_mapping = self ._create_schema_by_table_mapping ()
47
+ self ._worker_pool = ThreadPool (processes = 1 )
48
+ self ._time_out_sec = conf .get_int (SqlToTblColUsageTransformer .COLUMN_EXTRACTION_TIMEOUT_SEC , 10 )
49
+ LOGGER .info ('Column extraction timeout: {} seconds' .format (self ._time_out_sec ))
50
+ self ._log_all_extraction_failures = conf .get_bool (SqlToTblColUsageTransformer .LOG_ALL_EXTRACTION_FAILURES ,
51
+ False )
41
52
42
53
def transform (self , record ):
43
54
# type: (Any) -> Optional[TableColumnUsage]
@@ -48,11 +59,20 @@ def transform(self, record):
48
59
49
60
result = [] # type: List[ColumnReader]
50
61
try :
51
- columns = ColumnUsageProvider .get_columns ( query = stmt )
62
+ columns = self . _worker_pool . apply_async ( ColumnUsageProvider .get_columns , ( stmt ,)). get ( self . _time_out_sec )
52
63
# LOGGER.info('Statement: {} ---> columns: {}'.format(stmt, columns))
64
+ except TimeoutError :
65
+ SqlToTblColUsageTransformer .failure_counts += 1
66
+ LOGGER .exception ('Timed out while getting column usage from query: {}' .format (stmt ))
67
+ LOGGER .info ('Killing the thread.' )
68
+ self ._worker_pool .terminate ()
69
+ self ._worker_pool = ThreadPool (processes = 1 )
70
+ LOGGER .info ('Killed the thread.' )
71
+ return None
53
72
except Exception :
54
73
SqlToTblColUsageTransformer .failure_counts += 1
55
- LOGGER .exception ('Failed to get column usage from query: {}' .format (stmt ))
74
+ if self ._log_all_extraction_failures :
75
+ LOGGER .exception ('Failed to get column usage from query: {}' .format (stmt ))
56
76
return None
57
77
58
78
# Dedupe is needed to make it table level. TODO: Remove this once we are at column level
0 commit comments