Skip to content

Commit e8679aa

Browse files
authored
feat: adding first pass of delta lake metadata extractor as well as a sample script on how it would be used. (#351)
Signed-off-by: sshuster <sshuster@edmunds.com> fixing the table last updated parts Signed-off-by: sshuster <sshuster@edmunds.com> Adding pyspark dependency Fixing formatting for the two deltalake classes Signed-off-by: sshuster <sshuster@edmunds.com> Adding delta-lake unit tests and fixing delta lake metadata extractor bugs. Now is a working piece of code. Signed-off-by: sshuster <sshuster@edmunds.com> Adding more typing information and changing formatting Signed-off-by: sshuster <sshuster@edmunds.com> Fixing column taggging issue which doesn't accomplish what we need it to. Signed-off-by: sshuster <sshuster@edmunds.com> fixing lint Signed-off-by: sshuster <sshuster@edmunds.com> Fixing the typing Signed-off-by: sshuster <sshuster@edmunds.com> fixing lint again Signed-off-by: sshuster <sshuster@edmunds.com> More documentation Signed-off-by: sshuster <sshuster@edmunds.com> adding some todos per code review. fixing the sample deltalake metadata script Signed-off-by: sshuster <sshuster@edmunds.com> fixing lint Signed-off-by: sshuster <sshuster@edmunds.com> - removed pyspark from requirements and moved to extras - added some simple parallelism to the scraping - Fixed bug that should allow for parsing the describe statement on both spark 2 and spark 3 - Fixed bug that caused parser to die if table does not exist Signed-off-by: sshuster <sshuster@edmunds.com> Fixing another exception. Adding spark to all_deps Signed-off-by: sshuster <sshuster@edmunds.com> Fixed table parsing so that it doesn't try getting table details on views. Signed-off-by: sshuster <sshuster@edmunds.com> Fixed import Signed-off-by: sshuster <sshuster@edmunds.com> fixing mypy issues Signed-off-by: sshuster <sshuster@edmunds.com>
1 parent 2992618 commit e8679aa

File tree

5 files changed

+672
-1
lines changed

5 files changed

+672
-1
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,27 @@ If using the filters option here is the input format
176176
]
177177
```
178178

179+
#### [Delta-Lake-MetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/delta_lake_metadata_extractor.py)
180+
An extractor that runs on a spark cluster and obtains delta-lake metadata using spark sql commands.
181+
This custom solution is currently necessary because the hive metastore does not contain all metadata information for delta-lake tables.
182+
For simplicity, this extractor can also be used for all hive tables as well.
183+
184+
Because it must run on a spark cluster,
185+
it is required that you have an operator (for example a [databricks submit run operator](https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/databricks_operator.html))
186+
that calls the configuration code on a spark cluster.
187+
```python
188+
spark = SparkSession.builder.appName("Amundsen Delta Lake Metadata Extraction").getOrCreate()
189+
job_config = create_delta_lake_job_config()
190+
dExtractor = DeltaLakeMetadataExtractor()
191+
dExtractor.set_spark(spark)
192+
job = DefaultJob(conf=job_config,
193+
task=DefaultTask(extractor=dExtractor, loader=FsNeo4jCSVLoader()),
194+
publisher=Neo4jCsvPublisher())
195+
job.launch()
196+
```
197+
198+
You can check out the sample deltalake metadata script for a full example.
199+
179200
#### [DremioMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/dremio_metadata_extractor.py)
180201
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from [Dremio](https://www.dremio.com).
181202

Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
# Copyright Contributors to the Amundsen project.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from datetime import datetime
4+
import logging
5+
from collections import namedtuple
6+
7+
from databuilder.extractor.base_extractor import Extractor
8+
from databuilder.models.table_last_updated import TableLastUpdated
9+
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
10+
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
11+
from pyspark.sql import SparkSession
12+
from pyspark.sql.catalog import Table
13+
from pyspark.sql.utils import AnalysisException
14+
from typing import Iterator, Union, List, Dict, Optional # noqa: F401
15+
import concurrent.futures
16+
17+
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
18+
19+
LOGGER = logging.getLogger(__name__)
20+
21+
22+
# TODO once column tags work properly, consider deprecating this for TableMetadata directly
23+
class ScrapedColumnMetadata(object):
24+
def __init__(self, name: str, data_type: str, description: Optional[str], sort_order: int):
25+
self.name = name
26+
self.data_type = data_type
27+
self.description = description
28+
self.sort_order = sort_order
29+
self.is_partition = False
30+
self.attributes: Dict[str, str] = {}
31+
32+
def set_is_partition(self, is_partition: bool) -> None:
33+
self.is_partition = is_partition
34+
35+
def __eq__(self, other: object) -> bool:
36+
if not isinstance(other, ScrapedColumnMetadata):
37+
return False
38+
return self.name == other.name and \
39+
self.data_type == other.data_type and \
40+
self.description == other.description and \
41+
self.sort_order == other.sort_order and \
42+
self.is_partition == other.is_partition and \
43+
self.attributes == other.attributes
44+
45+
def __repr__(self) -> str:
46+
return "{0}:{1}".format(self.name, self.data_type)
47+
48+
49+
# TODO consider deprecating this for using TableMetadata directly
50+
class ScrapedTableMetadata(object):
51+
LAST_MODIFIED_KEY = 'lastModified'
52+
DESCRIPTION_KEY = 'description'
53+
TABLE_FORMAT_KEY = 'format'
54+
55+
def __init__(self, schema: str, table: str):
56+
self.schema: str = schema
57+
self.table: str = table
58+
self.table_detail: Optional[Dict] = None
59+
self.view_detail: Optional[Dict] = None
60+
self.is_view: bool = False
61+
self.failed_to_scrape: bool = False
62+
self.columns: Optional[List[ScrapedColumnMetadata]] = None
63+
64+
def set_table_detail(self, table_detail: Dict) -> None:
65+
self.table_detail = table_detail
66+
self.is_view = False
67+
self.failed_to_scrape = False
68+
69+
def set_view_detail(self, view_detail: Dict) -> None:
70+
self.view_detail = view_detail
71+
self.is_view = True
72+
self.failed_to_scrape = False
73+
74+
def get_details(self) -> Optional[Dict]:
75+
if self.is_view:
76+
return self.view_detail
77+
else:
78+
return self.table_detail
79+
80+
def get_full_table_name(self) -> str:
81+
return self.schema + "." + self.table
82+
83+
def set_failed_to_scrape(self) -> None:
84+
self.failed_to_scrape = True
85+
86+
def set_columns(self, column_list: List[ScrapedColumnMetadata]) -> None:
87+
self.columns = column_list
88+
89+
def get_last_modified(self) -> Optional[datetime]:
90+
details = self.get_details()
91+
if details and self.LAST_MODIFIED_KEY in details:
92+
return details[self.LAST_MODIFIED_KEY]
93+
else:
94+
return None
95+
96+
def get_table_description(self) -> Optional[str]:
97+
details = self.get_details()
98+
if details and self.DESCRIPTION_KEY in details:
99+
return details[self.DESCRIPTION_KEY]
100+
else:
101+
return None
102+
103+
def is_delta_table(self) -> bool:
104+
details = self.get_details()
105+
if details and self.TABLE_FORMAT_KEY in details:
106+
return details[self.TABLE_FORMAT_KEY].lower() == 'delta'
107+
else:
108+
return False
109+
110+
def __repr__(self) -> str:
111+
return "{schema}.{table}".format(schema=self.schema, table=self.table)
112+
113+
114+
class DeltaLakeMetadataExtractor(Extractor):
115+
"""
116+
Extracts Delta Lake Metadata.
117+
This requires a spark session to run that has a hive metastore populated with all of the delta tables
118+
that you are interested in.
119+
"""
120+
# CONFIG KEYS
121+
DATABASE_KEY = "database"
122+
# If you want to exclude specific schemas
123+
EXCLUDE_LIST_SCHEMAS_KEY = "exclude_list"
124+
# If you want to only include specific schemas
125+
SCHEMA_LIST_KEY = "schema_list"
126+
CLUSTER_KEY = "cluster"
127+
# By default, this will only process and emit delta-lake tables, but it can support all hive table types.
128+
DELTA_TABLES_ONLY = "delta_tables_only"
129+
DEFAULT_CONFIG = ConfigFactory.from_dict({DATABASE_KEY: "delta-lake",
130+
EXCLUDE_LIST_SCHEMAS_KEY: [],
131+
SCHEMA_LIST_KEY: [],
132+
DELTA_TABLES_ONLY: True})
133+
PARTITION_COLUMN_TAG = 'is_partition'
134+
135+
def init(self, conf: ConfigTree) -> None:
136+
self.conf = conf.with_fallback(DeltaLakeMetadataExtractor.DEFAULT_CONFIG)
137+
self._extract_iter = None # type: Union[None, Iterator]
138+
self._cluster = self.conf.get_string(DeltaLakeMetadataExtractor.CLUSTER_KEY)
139+
self._db = self.conf.get_string(DeltaLakeMetadataExtractor.DATABASE_KEY)
140+
self.exclude_list = self.conf.get_list(DeltaLakeMetadataExtractor.EXCLUDE_LIST_SCHEMAS_KEY)
141+
self.schema_list = self.conf.get_list(DeltaLakeMetadataExtractor.SCHEMA_LIST_KEY)
142+
self.delta_tables_only = self.conf.get_bool(DeltaLakeMetadataExtractor.DELTA_TABLES_ONLY)
143+
144+
def set_spark(self, spark: SparkSession) -> None:
145+
self.spark = spark
146+
147+
def extract(self) -> Union[TableMetadata, TableLastUpdated, None]:
148+
if not self._extract_iter:
149+
self._extract_iter = self._get_extract_iter()
150+
try:
151+
return next(self._extract_iter)
152+
except StopIteration:
153+
return None
154+
155+
def get_scope(self) -> str:
156+
return 'extractor.delta_lake_table_metadata'
157+
158+
def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, None]]:
159+
"""
160+
Given either a list of schemas, or a list of exclude schemas,
161+
it will query hive metastore and then access delta log
162+
to get all of the metadata for your delta tables. It will produce:
163+
- table and column metadata
164+
- last updated information
165+
"""
166+
if self.schema_list:
167+
LOGGER.info("working on {}".format(self.schema_list))
168+
tables = self.get_all_tables(self.schema_list)
169+
else:
170+
LOGGER.info("fetching all schemas")
171+
LOGGER.info("Excluding: {}".format(self.exclude_list))
172+
schemas = self.get_schemas(self.exclude_list)
173+
LOGGER.info("working on {}".format(schemas))
174+
tables = self.get_all_tables(schemas)
175+
# TODO add the programmatic information as well?
176+
# TODO add watermarks
177+
scraped_tables = self.scrape_all_tables(tables)
178+
for scraped_table in scraped_tables:
179+
if not scraped_table:
180+
continue
181+
if self.delta_tables_only and not scraped_table.is_delta_table():
182+
LOGGER.info("Skipping none delta table {}".format(scraped_table.table))
183+
continue
184+
else:
185+
yield self.create_table_metadata(scraped_table)
186+
last_updated = self.create_table_last_updated(scraped_table)
187+
if last_updated:
188+
yield last_updated
189+
190+
def get_schemas(self, exclude_list: List[str]) -> List[str]:
191+
'''Returns all schemas.'''
192+
schemas = self.spark.catalog.listDatabases()
193+
ret = []
194+
for schema in schemas:
195+
if schema.name not in exclude_list:
196+
ret.append(schema.name)
197+
return ret
198+
199+
def get_all_tables(self, schemas: List[str]) -> List[Table]:
200+
'''Returns all tables.'''
201+
ret = []
202+
for schema in schemas:
203+
ret.extend(self.get_tables_for_schema(schema))
204+
return ret
205+
206+
def get_tables_for_schema(self, schema: str) -> List[Table]:
207+
'''Returns all tables for a specific schema.'''
208+
return self.spark.catalog.listTables(schema)
209+
210+
def scrape_all_tables(self, tables: List[Table]) -> List[Optional[ScrapedTableMetadata]]:
211+
with concurrent.futures.ThreadPoolExecutor() as executor:
212+
futures = [executor.submit(self.scrape_table, table) for table in tables]
213+
scraped_tables = [f.result() for f in futures]
214+
return scraped_tables
215+
216+
def scrape_table(self, table: Table) -> Optional[ScrapedTableMetadata]:
217+
'''Takes a table object and creates a scraped table metadata object.'''
218+
met = ScrapedTableMetadata(schema=table.database, table=table.name)
219+
table_name = met.get_full_table_name()
220+
if table.tableType and table.tableType.lower() != 'view':
221+
table_detail = self.scrape_table_detail(table_name)
222+
if table_detail is None:
223+
LOGGER.error("Failed to parse table " + table_name)
224+
met.set_failed_to_scrape()
225+
return None
226+
else:
227+
LOGGER.info("Successfully parsed table " + table_name)
228+
met.set_table_detail(table_detail)
229+
else:
230+
view_detail = self.scrape_view_detail(table_name)
231+
if view_detail is None:
232+
LOGGER.error("Failed to parse view " + table_name)
233+
met.set_failed_to_scrape()
234+
return None
235+
else:
236+
LOGGER.info("Successfully parsed view " + table_name)
237+
met.set_view_detail(view_detail)
238+
columns = self.fetch_columns(met.schema, met.table)
239+
if not columns:
240+
LOGGER.error("Failed to parse columns for " + table_name)
241+
return None
242+
else:
243+
met.set_columns(columns)
244+
return met
245+
246+
def scrape_table_detail(self, table_name: str) -> Optional[Dict]:
247+
try:
248+
table_details_df = self.spark.sql("describe detail {0}".format(table_name))
249+
table_detail = table_details_df.collect()[0]
250+
return table_detail.asDict()
251+
except Exception as e:
252+
LOGGER.error(e)
253+
return None
254+
255+
def scrape_view_detail(self, view_name: str) -> Optional[Dict]:
256+
# TODO the blanket try catches need to be changed
257+
describeExtendedOutput = []
258+
try:
259+
describeExtendedOutput = self.spark.sql("describe extended {view_name}"
260+
.format(view_name=view_name)).collect()
261+
except Exception as e:
262+
LOGGER.error(e)
263+
return None
264+
view_detail = {}
265+
startAdding = False
266+
for row in describeExtendedOutput:
267+
row_dict = row.asDict()
268+
if startAdding:
269+
view_detail[row_dict['col_name']] = row_dict['data_type']
270+
if "# Detailed Table" in row_dict['col_name']:
271+
# Then start parsing
272+
startAdding = True
273+
return view_detail
274+
275+
def fetch_columns(self, schema: str, table: str) -> List[ScrapedColumnMetadata]:
276+
'''This fetches delta table columns, which unfortunately
277+
in the general case cannot rely on spark.catalog.listColumns.'''
278+
raw_columns = []
279+
try:
280+
raw_columns = self.spark.sql("describe {0}.{1}".format(schema, table)).collect()
281+
except AnalysisException as e:
282+
LOGGER.error(e)
283+
return raw_columns
284+
parsed_columns = {}
285+
partition_cols = False
286+
sort_order = 0
287+
for row in raw_columns:
288+
col_name = row['col_name']
289+
# NOTE: the behavior of describe has changed between spark 2 and spark 3
290+
if col_name == '' or '#' in col_name:
291+
partition_cols = True
292+
continue
293+
if not partition_cols:
294+
column = ScrapedColumnMetadata(
295+
name=row['col_name'],
296+
description=row['comment'] if row['comment'] else None,
297+
data_type=row['data_type'],
298+
sort_order=sort_order
299+
)
300+
parsed_columns[row['col_name']] = column
301+
sort_order += 1
302+
else:
303+
if row['data_type'] in parsed_columns:
304+
LOGGER.debug("Adding partition column table for {0}".format(row['data_type']))
305+
parsed_columns[row['data_type']].set_is_partition(True)
306+
elif row['col_name'] in parsed_columns:
307+
LOGGER.debug("Adding partition column table for {0}".format(row['col_name']))
308+
parsed_columns[row['col_name']].set_is_partition(True)
309+
return list(parsed_columns.values())
310+
311+
def create_table_metadata(self, table: ScrapedTableMetadata) -> TableMetadata:
312+
'''Creates the amundsen table metadata object from the ScrapedTableMetadata object.'''
313+
amundsen_columns = []
314+
if table.columns:
315+
for column in table.columns:
316+
amundsen_columns.append(
317+
ColumnMetadata(name=column.name,
318+
description=column.description,
319+
col_type=column.data_type,
320+
sort_order=column.sort_order)
321+
)
322+
description = table.get_table_description()
323+
return TableMetadata(self._db,
324+
self._cluster,
325+
table.schema,
326+
table.table,
327+
description,
328+
amundsen_columns,
329+
table.is_view)
330+
331+
def create_table_last_updated(self, table: ScrapedTableMetadata) -> Optional[TableLastUpdated]:
332+
'''Creates the amundsen table last updated metadata object from the ScrapedTableMetadata object.'''
333+
last_modified = table.get_last_modified()
334+
if last_modified:
335+
return TableLastUpdated(table_name=table.table,
336+
last_updated_time_epoch=int(last_modified.timestamp()),
337+
schema=table.schema,
338+
db=self._db,
339+
cluster=self._cluster)
340+
else:
341+
return None

0 commit comments

Comments
 (0)