Skip to content

Commit 2eff7ee

Browse files
authored
Added collection of used tables from Python notebooks and files and SQL queries (#2772)
## Changes Users need the ability to track legacy table usage and lineage. This PR collects and stores table infos as part of linting jobs. This PR will be followed by 2 PRs for usage in queries, and for displaying results in the assessment dashboard. ### Linked issues None ### Functionality - [x] modified existing workflow: `assessment` - [x] added new tables and views ### Tests - [x] added unit tests - [x] updates integration tests --------- Co-authored-by: Eric Vergnaud <eric.vergnaud@databricks.com>
1 parent 437cf6b commit 2eff7ee

File tree

20 files changed

+688
-222
lines changed

20 files changed

+688
-222
lines changed

src/databricks/labs/ucx/install.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment
7676
from databricks.labs.ucx.recon.migration_recon import ReconResult
7777
from databricks.labs.ucx.runtime import Workflows
78+
from databricks.labs.ucx.source_code.base import UsedTable
7879
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess
7980
from databricks.labs.ucx.source_code.jobs import JobProblem
8081
from databricks.labs.ucx.source_code.queries import QueryProblem
@@ -124,6 +125,8 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
124125
functools.partial(table, "recon_results", ReconResult),
125126
functools.partial(table, "directfs_in_paths", DirectFsAccess),
126127
functools.partial(table, "directfs_in_queries", DirectFsAccess),
128+
functools.partial(table, "used_tables_in_paths", UsedTable),
129+
functools.partial(table, "used_tables_in_queries", UsedTable),
127130
],
128131
)
129132
deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql")
@@ -133,6 +136,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
133136
deployer.deploy_view("code_patterns", "queries/views/code_patterns.sql")
134137
deployer.deploy_view("reconciliation_results", "queries/views/reconciliation_results.sql")
135138
deployer.deploy_view("directfs", "queries/views/directfs.sql")
139+
deployer.deploy_view("used_tables", "queries/views/used_tables.sql")
136140

137141

138142
def extract_major_minor(version_string):
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
SELECT
2+
catalog_name,
3+
schema_name,
4+
table_name,
5+
is_read,
6+
is_write,
7+
source_id,
8+
source_timestamp,
9+
source_lineage,
10+
assessment_start_timestamp,
11+
assessment_end_timestamp
12+
FROM $inventory.used_tables_in_paths
13+
UNION ALL
14+
SELECT
15+
catalog_name,
16+
schema_name,
17+
table_name,
18+
is_read,
19+
is_write,
20+
source_id,
21+
source_timestamp,
22+
source_lineage,
23+
assessment_start_timestamp,
24+
assessment_end_timestamp
25+
FROM $inventory.used_tables_in_queries

src/databricks/labs/ucx/source_code/base.py

Lines changed: 193 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import dataclasses
55
import locale
66
import logging
7+
import sys
78
from abc import abstractmethod, ABC
89
from collections.abc import Iterable
9-
from dataclasses import dataclass
10+
from dataclasses import dataclass, field
11+
from datetime import datetime
1012
from pathlib import Path
13+
from typing import Any
1114

1215
from astroid import AstroidSyntaxError, NodeNG # type: ignore
1316
from sqlglot import Expression, parse as parse_sql, ParseError as SqlParseError
@@ -19,6 +22,11 @@
1922

2023
from databricks.labs.ucx.source_code.python.python_ast import Tree
2124

25+
if sys.version_info >= (3, 11):
26+
from typing import Self
27+
else:
28+
from typing_extensions import Self
29+
2230
# Code mapping between LSP, PyLint, and our own diagnostics:
2331
# | LSP | PyLint | Our |
2432
# |---------------------------|------------|----------------|
@@ -174,6 +182,140 @@ def name(self) -> str: ...
174182
def apply(self, code: str) -> str: ...
175183

176184

185+
@dataclass
186+
class LineageAtom:
187+
188+
object_type: str
189+
object_id: str
190+
other: dict[str, str] | None = None
191+
192+
193+
@dataclass
194+
class SourceInfo:
195+
196+
@classmethod
197+
def from_dict(cls, data: dict[str, Any]) -> Self:
198+
source_lineage = data.get("source_lineage", None)
199+
if isinstance(source_lineage, list) and len(source_lineage) > 0 and isinstance(source_lineage[0], dict):
200+
lineage_atoms = [LineageAtom(**lineage) for lineage in source_lineage]
201+
data["source_lineage"] = lineage_atoms
202+
return cls(**data)
203+
204+
UNKNOWN = "unknown"
205+
206+
source_id: str = UNKNOWN
207+
source_timestamp: datetime = datetime.fromtimestamp(0)
208+
source_lineage: list[LineageAtom] = field(default_factory=list)
209+
assessment_start_timestamp: datetime = datetime.fromtimestamp(0)
210+
assessment_end_timestamp: datetime = datetime.fromtimestamp(0)
211+
212+
def replace_source(
213+
self,
214+
source_id: str | None = None,
215+
source_lineage: list[LineageAtom] | None = None,
216+
source_timestamp: datetime | None = None,
217+
):
218+
return dataclasses.replace(
219+
self,
220+
source_id=source_id or self.source_id,
221+
source_timestamp=source_timestamp or self.source_timestamp,
222+
source_lineage=source_lineage or self.source_lineage,
223+
)
224+
225+
def replace_assessment_infos(
226+
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None
227+
):
228+
return dataclasses.replace(
229+
self,
230+
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp,
231+
assessment_end_timestamp=assessment_end or self.assessment_end_timestamp,
232+
)
233+
234+
235+
@dataclass
236+
class UsedTable(SourceInfo):
237+
238+
@classmethod
239+
def parse(cls, value: str, default_schema: str) -> UsedTable:
240+
parts = value.split(".")
241+
if len(parts) >= 3:
242+
catalog_name = parts.pop(0)
243+
else:
244+
catalog_name = "hive_metastore"
245+
if len(parts) >= 2:
246+
schema_name = parts.pop(0)
247+
else:
248+
schema_name = default_schema
249+
return UsedTable(catalog_name=catalog_name, schema_name=schema_name, table_name=parts[0])
250+
251+
catalog_name: str = SourceInfo.UNKNOWN
252+
schema_name: str = SourceInfo.UNKNOWN
253+
table_name: str = SourceInfo.UNKNOWN
254+
is_read: bool = True
255+
is_write: bool = False
256+
257+
258+
class TableCollector(ABC):
259+
260+
@abstractmethod
261+
def collect_tables(self, source_code: str) -> Iterable[UsedTable]: ...
262+
263+
264+
@dataclass
265+
class TableInfoNode:
266+
table: UsedTable
267+
node: NodeNG
268+
269+
270+
class TablePyCollector(TableCollector, ABC):
271+
272+
def collect_tables(self, source_code: str):
273+
tree = Tree.normalize_and_parse(source_code)
274+
for table_node in self.collect_tables_from_tree(tree):
275+
yield table_node.table
276+
277+
@abstractmethod
278+
def collect_tables_from_tree(self, tree: Tree) -> Iterable[TableInfoNode]: ...
279+
280+
281+
class TableSqlCollector(TableCollector, ABC): ...
282+
283+
284+
@dataclass
285+
class DirectFsAccess(SourceInfo):
286+
"""A record describing a Direct File System Access"""
287+
288+
path: str = SourceInfo.UNKNOWN
289+
is_read: bool = False
290+
is_write: bool = False
291+
292+
293+
@dataclass
294+
class DirectFsAccessNode:
295+
dfsa: DirectFsAccess
296+
node: NodeNG
297+
298+
299+
class DfsaCollector(ABC):
300+
301+
@abstractmethod
302+
def collect_dfsas(self, source_code: str) -> Iterable[DirectFsAccess]: ...
303+
304+
305+
class DfsaPyCollector(DfsaCollector, ABC):
306+
307+
def collect_dfsas(self, source_code: str) -> Iterable[DirectFsAccess]:
308+
tree = Tree.normalize_and_parse(source_code)
309+
for dfsa_node in self.collect_dfsas_from_tree(tree):
310+
yield dfsa_node.dfsa
311+
312+
@abstractmethod
313+
def collect_dfsas_from_tree(self, tree: Tree) -> Iterable[DirectFsAccessNode]: ...
314+
315+
316+
class DfsaSqlCollector(DfsaCollector, ABC): ...
317+
318+
177319
# The default schema to use when the schema is not specified in a table reference
178320
# See: https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-qry-select-usedb.html
179321
DEFAULT_CATALOG = 'hive_metastore'
@@ -221,20 +363,42 @@ def parse_security_mode(mode_str: str | None) -> compute.DataSecurityMode | None
221363
return None
222364

223365

224-
class SqlSequentialLinter(SqlLinter):
366+
class SqlSequentialLinter(SqlLinter, DfsaCollector, TableCollector):
225367

226-
def __init__(self, linters: list[SqlLinter]):
368+
def __init__(
369+
self,
370+
linters: list[SqlLinter],
371+
dfsa_collectors: list[DfsaSqlCollector],
372+
table_collectors: list[TableSqlCollector],
373+
):
227374
self._linters = linters
375+
self._dfsa_collectors = dfsa_collectors
376+
self._table_collectors = table_collectors
228377

229378
def lint_expression(self, expression: Expression) -> Iterable[Advice]:
230379
for linter in self._linters:
231380
yield from linter.lint_expression(expression)
232381

382+
def collect_dfsas(self, source_code: str) -> Iterable[DirectFsAccess]:
383+
for collector in self._dfsa_collectors:
384+
yield from collector.collect_dfsas(source_code)
385+
386+
def collect_tables(self, source_code: str) -> Iterable[UsedTable]:
387+
for collector in self._table_collectors:
388+
yield from collector.collect_tables(source_code)
233389

234-
class PythonSequentialLinter(Linter):
235390

236-
def __init__(self, linters: list[PythonLinter]):
391+
class PythonSequentialLinter(Linter, DfsaCollector, TableCollector):
392+
393+
def __init__(
394+
self,
395+
linters: list[PythonLinter],
396+
dfsa_collectors: list[DfsaPyCollector],
397+
table_collectors: list[TablePyCollector],
398+
):
237399
self._linters = linters
400+
self._dfsa_collectors = dfsa_collectors
401+
self._table_collectors = table_collectors
238402
self._tree: Tree | None = None
239403

240404
def lint(self, code: str) -> Iterable[Advice]:
@@ -271,6 +435,30 @@ def process_child_cell(self, code: str):
271435
# error already reported when linting enclosing notebook
272436
logger.warning(f"Failed to parse Python cell: {code}", exc_info=e)
273437

438+
def collect_dfsas(self, source_code: str) -> Iterable[DirectFsAccess]:
439+
try:
440+
tree = self._parse_and_append(source_code)
441+
for dfsa_node in self.collect_dfsas_from_tree(tree):
442+
yield dfsa_node.dfsa
443+
except AstroidSyntaxError as e:
444+
logger.warning('syntax-error', exc_info=e)
445+
446+
def collect_dfsas_from_tree(self, tree: Tree) -> Iterable[DirectFsAccessNode]:
447+
for collector in self._dfsa_collectors:
448+
yield from collector.collect_dfsas_from_tree(tree)
449+
450+
def collect_tables(self, source_code: str) -> Iterable[UsedTable]:
451+
try:
452+
tree = self._parse_and_append(source_code)
453+
for table_node in self.collect_tables_from_tree(tree):
454+
yield table_node.table
455+
except AstroidSyntaxError as e:
456+
logger.warning('syntax-error', exc_info=e)
457+
458+
def collect_tables_from_tree(self, tree: Tree) -> Iterable[TableInfoNode]:
459+
for collector in self._table_collectors:
460+
yield from collector.collect_tables_from_tree(tree)
461+
274462
def _make_tree(self) -> Tree:
275463
if self._tree is None:
276464
self._tree = Tree.new_module()

src/databricks/labs/ucx/source_code/directfs_access.py

Lines changed: 1 addition & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,18 @@
11
from __future__ import annotations
22

3-
import dataclasses
43
import logging
5-
import sys
64
from collections.abc import Sequence, Iterable
7-
from dataclasses import dataclass, field
8-
from datetime import datetime
9-
from typing import Any
105

116
from databricks.labs.ucx.framework.crawlers import CrawlerBase
127
from databricks.labs.lsql.backends import SqlBackend
138
from databricks.sdk.errors import DatabricksError
149

1510
from databricks.labs.ucx.framework.utils import escape_sql_identifier
16-
17-
if sys.version_info >= (3, 11):
18-
from typing import Self
19-
else:
20-
from typing_extensions import Self
21-
11+
from databricks.labs.ucx.source_code.base import DirectFsAccess
2212

2313
logger = logging.getLogger(__name__)
2414

2515

26-
@dataclass
27-
class LineageAtom:
28-
29-
object_type: str
30-
object_id: str
31-
other: dict[str, str] | None = None
32-
33-
34-
@dataclass
35-
class DirectFsAccess:
36-
"""A record describing a Direct File System Access"""
37-
38-
@classmethod
39-
def from_dict(cls, data: dict[str, Any]) -> Self:
40-
source_lineage = data.get("source_lineage", None)
41-
if isinstance(source_lineage, list) and len(source_lineage) > 0 and isinstance(source_lineage[0], dict):
42-
lineage_atoms = [LineageAtom(**lineage) for lineage in source_lineage]
43-
data["source_lineage"] = lineage_atoms
44-
return cls(**data)
45-
46-
UNKNOWN = "unknown"
47-
48-
path: str
49-
is_read: bool
50-
is_write: bool
51-
source_id: str = UNKNOWN
52-
source_timestamp: datetime = datetime.fromtimestamp(0)
53-
source_lineage: list[LineageAtom] = field(default_factory=list)
54-
assessment_start_timestamp: datetime = datetime.fromtimestamp(0)
55-
assessment_end_timestamp: datetime = datetime.fromtimestamp(0)
56-
57-
def replace_source(
58-
self,
59-
source_id: str | None = None,
60-
source_lineage: list[LineageAtom] | None = None,
61-
source_timestamp: datetime | None = None,
62-
):
63-
return dataclasses.replace(
64-
self,
65-
source_id=source_id or self.source_id,
66-
source_timestamp=source_timestamp or self.source_timestamp,
67-
source_lineage=source_lineage or self.source_lineage,
68-
)
69-
70-
def replace_assessment_infos(
71-
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None
72-
):
73-
return dataclasses.replace(
74-
self,
75-
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp,
76-
assessment_end_timestamp=assessment_end or self.assessment_end_timestamp,
77-
)
78-
79-
8016
class DirectFsAccessCrawler(CrawlerBase[DirectFsAccess]):
8117

8218
@classmethod

src/databricks/labs/ucx/source_code/graph.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
from astroid import ( # type: ignore
1313
NodeNG,
1414
)
15-
from databricks.labs.ucx.source_code.base import Advisory, CurrentSessionState, is_a_notebook
16-
from databricks.labs.ucx.source_code.directfs_access import LineageAtom
15+
from databricks.labs.ucx.source_code.base import Advisory, CurrentSessionState, is_a_notebook, LineageAtom
1716
from databricks.labs.ucx.source_code.python.python_ast import Tree
1817
from databricks.labs.ucx.source_code.path_lookup import PathLookup
1918

0 commit comments

Comments
 (0)