Skip to content

Commit ec5bc48

Browse files
authored
collect used tables as part of running report (#2835)
## Changes Similarly to what is done for workflows, collect used tables as part of running report ### Linked issues None ### Functionality - [ ] modified existing command: `databricks labs ucx install`, now collects tables from dashboards as part of he assessment ### Tests - [x] added integration tests --------- Co-authored-by: Eric Vergnaud <eric.vergnaud@databricks.com>
1 parent 50163b9 commit ec5bc48

File tree

4 files changed

+154
-63
lines changed

4 files changed

+154
-63
lines changed

src/databricks/labs/ucx/contexts/application.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ def query_linter(self) -> QueryLinter:
449449
self.workspace_client,
450450
TableMigrationIndex([]), # TODO: bring back self.tables_migrator.index()
451451
self.directfs_access_crawler_for_queries,
452+
self.used_tables_crawler_for_queries,
452453
self.config.include_dashboard_ids,
453454
)
454455

src/databricks/labs/ucx/source_code/queries.py

Lines changed: 94 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import dataclasses
22
import logging
33
from collections.abc import Iterable
4-
from dataclasses import asdict, dataclass
4+
from dataclasses import asdict, dataclass, field
55
from datetime import datetime, timezone
66

77
from databricks.sdk import WorkspaceClient
@@ -12,11 +12,11 @@
1212

1313
from databricks.labs.ucx.framework.utils import escape_sql_identifier
1414
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
15-
from databricks.labs.ucx.source_code.base import CurrentSessionState, LineageAtom
15+
from databricks.labs.ucx.source_code.base import CurrentSessionState, LineageAtom, UsedTable
1616
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler, DirectFsAccess
1717
from databricks.labs.ucx.source_code.linters.context import LinterContext
18-
from databricks.labs.ucx.source_code.linters.directfs import DirectFsAccessSqlLinter
1918
from databricks.labs.ucx.source_code.redash import Redash
19+
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler
2020

2121
logger = logging.getLogger(__name__)
2222

@@ -33,60 +33,92 @@ class QueryProblem:
3333
message: str
3434

3535

36+
@dataclass
37+
class _ReportingContext:
38+
linted_queries: set[str] = field(default_factory=set)
39+
all_problems: list[QueryProblem] = field(default_factory=list)
40+
all_dfsas: list[DirectFsAccess] = field(default_factory=list)
41+
all_tables: list[UsedTable] = field(default_factory=list)
42+
43+
3644
class QueryLinter:
3745

3846
def __init__(
3947
self,
4048
ws: WorkspaceClient,
4149
migration_index: TableMigrationIndex,
4250
directfs_crawler: DirectFsAccessCrawler,
51+
used_tables_crawler: UsedTablesCrawler,
4352
include_dashboard_ids: list[str] | None,
4453
):
4554
self._ws = ws
4655
self._migration_index = migration_index
4756
self._directfs_crawler = directfs_crawler
57+
self._used_tables_crawler = used_tables_crawler
4858
self._include_dashboard_ids = include_dashboard_ids
4959

5060
def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
5161
assessment_start = datetime.now(timezone.utc)
52-
dashboard_ids = self._dashboard_ids_in_scope()
53-
logger.info(f"Running {len(dashboard_ids)} linting tasks...")
54-
linted_queries: set[str] = set()
55-
all_problems: list[QueryProblem] = []
56-
all_dfsas: list[DirectFsAccess] = []
57-
# first lint and collect queries from dashboards
58-
for dashboard_id in dashboard_ids:
59-
dashboard = self._ws.dashboards.get(dashboard_id=dashboard_id)
60-
problems, dfsas = self._lint_and_collect_from_dashboard(dashboard, linted_queries)
61-
all_problems.extend(problems)
62-
all_dfsas.extend(dfsas)
63-
for query in self._queries_in_scope():
64-
if query.id in linted_queries:
65-
continue
66-
linted_queries.add(query.id)
67-
problems = self.lint_query(query)
68-
all_problems.extend(problems)
69-
dfsas = self.collect_dfsas_from_query("no-dashboard-id", query)
70-
all_dfsas.extend(dfsas)
71-
# dump problems
72-
logger.info(f"Saving {len(all_problems)} linting problems...")
62+
context = _ReportingContext()
63+
self._lint_dashboards(context)
64+
self._lint_queries(context)
65+
assessment_end = datetime.now(timezone.utc)
66+
self._dump_problems(context, sql_backend, inventory_database)
67+
self._dump_dfsas(context, assessment_start, assessment_end)
68+
self._dump_used_tables(context, assessment_start, assessment_end)
69+
70+
def _dump_problems(self, context: _ReportingContext, sql_backend: SqlBackend, inventory_database: str):
71+
logger.info(f"Saving {len(context.all_problems)} linting problems...")
7372
sql_backend.save_table(
7473
f'{escape_sql_identifier(inventory_database)}.query_problems',
75-
all_problems,
74+
context.all_problems,
7675
QueryProblem,
7776
mode='overwrite',
7877
)
79-
# dump dfsas
80-
assessment_end = datetime.now(timezone.utc)
81-
processed = []
82-
for dfsa in all_dfsas:
78+
79+
def _dump_dfsas(self, context: _ReportingContext, assessment_start: datetime, assessment_end: datetime):
80+
processed_dfsas = []
81+
for dfsa in context.all_dfsas:
8382
dfsa = dataclasses.replace(
8483
dfsa,
8584
assessment_start_timestamp=assessment_start,
8685
assessment_end_timestamp=assessment_end,
8786
)
88-
processed.append(dfsa)
89-
self._directfs_crawler.dump_all(processed)
87+
processed_dfsas.append(dfsa)
88+
self._directfs_crawler.dump_all(processed_dfsas)
89+
90+
def _dump_used_tables(self, context: _ReportingContext, assessment_start: datetime, assessment_end: datetime):
91+
processed_tables = []
92+
for table in context.all_tables:
93+
table = dataclasses.replace(
94+
table,
95+
assessment_start_timestamp=assessment_start,
96+
assessment_end_timestamp=assessment_end,
97+
)
98+
processed_tables.append(table)
99+
self._used_tables_crawler.dump_all(processed_tables)
100+
101+
def _lint_dashboards(self, context: _ReportingContext):
102+
dashboard_ids = self._dashboard_ids_in_scope()
103+
logger.info(f"Running {len(dashboard_ids)} linting tasks...")
104+
for dashboard_id in dashboard_ids:
105+
dashboard = self._ws.dashboards.get(dashboard_id=dashboard_id)
106+
problems, dfsas, tables = self._lint_and_collect_from_dashboard(dashboard, context.linted_queries)
107+
context.all_problems.extend(problems)
108+
context.all_dfsas.extend(dfsas)
109+
context.all_tables.extend(tables)
110+
111+
def _lint_queries(self, context: _ReportingContext):
112+
for query in self._queries_in_scope():
113+
if query.id in context.linted_queries:
114+
continue
115+
context.linted_queries.add(query.id)
116+
problems = self.lint_query(query)
117+
context.all_problems.extend(problems)
118+
dfsas = self.collect_dfsas_from_query("no-dashboard-id", query)
119+
context.all_dfsas.extend(dfsas)
120+
tables = self.collect_used_tables_from_query("no-dashboard-id", query)
121+
context.all_tables.extend(tables)
90122

91123
def _dashboard_ids_in_scope(self) -> list[str]:
92124
if self._include_dashboard_ids is not None: # an empty list is accepted
@@ -102,10 +134,11 @@ def _queries_in_scope(self):
102134

103135
def _lint_and_collect_from_dashboard(
104136
self, dashboard: Dashboard, linted_queries: set[str]
105-
) -> tuple[Iterable[QueryProblem], Iterable[DirectFsAccess]]:
137+
) -> tuple[Iterable[QueryProblem], Iterable[DirectFsAccess], Iterable[UsedTable]]:
106138
dashboard_queries = Redash.get_queries_from_dashboard(dashboard)
107139
query_problems: list[QueryProblem] = []
108140
query_dfsas: list[DirectFsAccess] = []
141+
query_tables: list[UsedTable] = []
109142
dashboard_id = dashboard.id or "<no-id>"
110143
dashboard_parent = dashboard.parent or "<orphan>"
111144
dashboard_name = dashboard.name or "<anonymous>"
@@ -134,7 +167,16 @@ def _lint_and_collect_from_dashboard(
134167
)
135168
source_lineage = [atom] + dfsa.source_lineage
136169
query_dfsas.append(dataclasses.replace(dfsa, source_lineage=source_lineage))
137-
return query_problems, query_dfsas
170+
tables = self.collect_used_tables_from_query(dashboard_id, query)
171+
for table in tables:
172+
atom = LineageAtom(
173+
object_type="DASHBOARD",
174+
object_id=dashboard_id,
175+
other={"parent": dashboard_parent, "name": dashboard_name},
176+
)
177+
source_lineage = [atom] + table.source_lineage
178+
query_tables.append(dataclasses.replace(table, source_lineage=source_lineage))
179+
return query_problems, query_dfsas, query_tables
138180

139181
def lint_query(self, query: LegacyQuery) -> Iterable[QueryProblem]:
140182
if not query.query:
@@ -156,20 +198,34 @@ def lint_query(self, query: LegacyQuery) -> Iterable[QueryProblem]:
156198
message=advice.message,
157199
)
158200

159-
@classmethod
160-
def collect_dfsas_from_query(cls, dashboard_id: str, query: LegacyQuery) -> Iterable[DirectFsAccess]:
201+
def collect_dfsas_from_query(self, dashboard_id: str, query: LegacyQuery) -> Iterable[DirectFsAccess]:
161202
if query.query is None:
162203
return
163-
linter = DirectFsAccessSqlLinter()
204+
ctx = LinterContext(self._migration_index, CurrentSessionState())
205+
collector = ctx.dfsa_collector(Language.SQL)
164206
source_id = f"{dashboard_id}/{query.id}"
165207
source_name = query.name or "<anonymous>"
166-
source_timestamp = cls._read_timestamp(query.updated_at)
208+
source_timestamp = self._read_timestamp(query.updated_at)
167209
source_lineage = [LineageAtom(object_type="QUERY", object_id=source_id, other={"name": source_name})]
168-
for dfsa in linter.collect_dfsas(query.query):
210+
for dfsa in collector.collect_dfsas(query.query):
169211
yield DirectFsAccess(**asdict(dfsa)).replace_source(
170212
source_id=source_id, source_timestamp=source_timestamp, source_lineage=source_lineage
171213
)
172214

215+
def collect_used_tables_from_query(self, dashboard_id: str, query: LegacyQuery) -> Iterable[UsedTable]:
216+
if query.query is None:
217+
return
218+
ctx = LinterContext(self._migration_index, CurrentSessionState())
219+
collector = ctx.tables_collector(Language.SQL)
220+
source_id = f"{dashboard_id}/{query.id}"
221+
source_name = query.name or "<anonymous>"
222+
source_timestamp = self._read_timestamp(query.updated_at)
223+
source_lineage = [LineageAtom(object_type="QUERY", object_id=source_id, other={"name": source_name})]
224+
for table in collector.collect_tables(query.query):
225+
yield UsedTable(**asdict(table)).replace_source(
226+
source_id=source_id, source_timestamp=source_timestamp, source_lineage=source_lineage
227+
)
228+
173229
@classmethod
174230
def _read_timestamp(cls, timestamp: str | None) -> datetime:
175231
if timestamp is not None:
Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,58 @@
11
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
22
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
33
from databricks.labs.ucx.source_code.queries import QueryLinter
4+
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler
45

56

6-
def test_query_linter_lints_queries_and_stores_dfsas(simple_ctx, ws, sql_backend, make_query, make_dashboard):
7-
query = make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`")
8-
_dashboard = make_dashboard(query=query)
9-
linter = QueryLinter(ws, TableMigrationIndex([]), simple_ctx.directfs_access_crawler_for_queries, None)
7+
def test_query_linter_lints_queries_and_stores_dfsas_and_tables(
8+
simple_ctx, ws, sql_backend, make_query, make_dashboard
9+
):
10+
queries = [make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`")]
11+
dashboards = [make_dashboard(query=queries[0])]
12+
queries.append(make_query(sql_query="SELECT * from some_schema.some_table"))
13+
dashboards.append(make_dashboard(query=queries[1]))
14+
linter = QueryLinter(
15+
ws,
16+
TableMigrationIndex([]),
17+
simple_ctx.directfs_access_crawler_for_queries,
18+
simple_ctx.used_tables_crawler_for_queries,
19+
None,
20+
)
1021
linter.refresh_report(sql_backend, simple_ctx.inventory_database)
1122
all_problems = sql_backend.fetch("SELECT * FROM query_problems", schema=simple_ctx.inventory_database)
12-
problems = [row for row in all_problems if row["query_name"] == query.name]
23+
problems = [row for row in all_problems if row["query_name"] == queries[0].name]
1324
assert len(problems) == 1
14-
crawler = DirectFsAccessCrawler.for_queries(sql_backend, simple_ctx.inventory_database)
15-
all_dfsas = crawler.snapshot()
16-
source_id = f"{_dashboard.id}/{query.id}"
25+
dfsa_crawler = DirectFsAccessCrawler.for_queries(sql_backend, simple_ctx.inventory_database)
26+
all_dfsas = dfsa_crawler.snapshot()
27+
source_id = f"{dashboards[0].id}/{queries[0].id}"
1728
dfsas = [dfsa for dfsa in all_dfsas if dfsa.source_id == source_id]
1829
assert len(dfsas) == 1
19-
dfsa = dfsas[0]
20-
assert len(dfsa.source_lineage) == 2
21-
lineage = dfsa.source_lineage[0]
30+
assert len(dfsas[0].source_lineage) == 2
31+
lineage = dfsas[0].source_lineage[0]
2232
assert lineage.object_type == "DASHBOARD"
23-
assert lineage.object_id == _dashboard.id
33+
assert lineage.object_id == dashboards[0].id
2434
assert lineage.other
25-
assert lineage.other.get("parent", None) == _dashboard.parent
26-
assert lineage.other.get("name", None) == _dashboard.name
27-
lineage = dfsa.source_lineage[1]
35+
assert lineage.other.get("parent", None) == dashboards[0].parent
36+
assert lineage.other.get("name", None) == dashboards[0].name
37+
lineage = dfsas[0].source_lineage[1]
2838
assert lineage.object_type == "QUERY"
2939
assert lineage.object_id == source_id
3040
assert lineage.other
31-
assert lineage.other.get("name", None) == query.name
41+
assert lineage.other.get("name", None) == queries[0].name
42+
used_tables_crawler = UsedTablesCrawler.for_queries(sql_backend, simple_ctx.inventory_database)
43+
all_tables = used_tables_crawler.snapshot()
44+
source_id = f"{dashboards[1].id}/{queries[1].id}"
45+
tables = [table for table in all_tables if table.source_id == source_id]
46+
assert len(tables) == 1
47+
assert len(tables[0].source_lineage) == 2
48+
lineage = tables[0].source_lineage[0]
49+
assert lineage.object_type == "DASHBOARD"
50+
assert lineage.object_id == dashboards[1].id
51+
assert lineage.other
52+
assert lineage.other.get("parent", None) == dashboards[1].parent
53+
assert lineage.other.get("name", None) == dashboards[1].name
54+
lineage = tables[0].source_lineage[1]
55+
assert lineage.object_type == "QUERY"
56+
assert lineage.object_id == source_id
57+
assert lineage.other
58+
assert lineage.other.get("name", None) == queries[1].name

tests/unit/source_code/test_queries.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
1010
from databricks.labs.ucx.source_code.queries import QueryLinter
11+
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler
1112

1213

1314
@pytest.mark.parametrize(
@@ -25,38 +26,44 @@
2526
)
2627
def test_query_linter_collects_dfsas_from_queries(name, query, dfsa_paths, is_read, is_write, migration_index):
2728
ws = create_autospec(WorkspaceClient)
28-
crawlers = create_autospec(DirectFsAccessCrawler)
29+
dfsa_crawler = create_autospec(DirectFsAccessCrawler)
30+
used_tables_crawler = create_autospec(UsedTablesCrawler)
2931
query = LegacyQuery.from_dict({"parent": "workspace", "name": name, "query": query})
30-
linter = QueryLinter(ws, migration_index, crawlers, None)
32+
linter = QueryLinter(ws, migration_index, dfsa_crawler, used_tables_crawler, None)
3133
dfsas = linter.collect_dfsas_from_query("no-dashboard-id", query)
3234
ws.assert_not_called()
33-
crawlers.assert_not_called()
35+
dfsa_crawler.assert_not_called()
36+
used_tables_crawler.assert_not_called()
3437
assert set(dfsa.path for dfsa in dfsas) == set(dfsa_paths)
3538
assert all(dfsa.is_read == is_read for dfsa in dfsas)
3639
assert all(dfsa.is_write == is_write for dfsa in dfsas)
3740

3841

3942
def test_query_liner_refresh_report_writes_query_problems(migration_index, mock_backend) -> None:
4043
ws = create_autospec(WorkspaceClient)
41-
crawlers = create_autospec(DirectFsAccessCrawler)
42-
linter = QueryLinter(ws, migration_index, crawlers, None)
44+
dfsa_crawler = create_autospec(DirectFsAccessCrawler)
45+
used_tables_crawler = create_autospec(UsedTablesCrawler)
46+
linter = QueryLinter(ws, migration_index, dfsa_crawler, used_tables_crawler, None)
4347

4448
linter.refresh_report(mock_backend, inventory_database="test")
4549

4650
assert mock_backend.has_rows_written_for("`test`.query_problems")
4751
ws.dashboards.list.assert_called_once()
48-
crawlers.assert_not_called()
52+
dfsa_crawler.assert_not_called()
53+
used_tables_crawler.assert_not_called()
4954

5055

5156
def test_lints_queries(migration_index, mock_backend) -> None:
5257
with mock.patch("databricks.labs.ucx.source_code.queries.Redash") as mocked_redash:
5358
query = LegacyQuery(id="123", query="SELECT * from nowhere")
5459
mocked_redash.get_queries_from_dashboard.return_value = [query]
5560
ws = create_autospec(WorkspaceClient)
56-
crawlers = create_autospec(DirectFsAccessCrawler)
57-
linter = QueryLinter(ws, migration_index, crawlers, ["1"])
61+
dfsa_crawler = create_autospec(DirectFsAccessCrawler)
62+
used_tables_crawler = create_autospec(UsedTablesCrawler)
63+
linter = QueryLinter(ws, migration_index, dfsa_crawler, used_tables_crawler, ["1"])
5864
linter.refresh_report(mock_backend, inventory_database="test")
5965

6066
assert mock_backend.has_rows_written_for("`test`.query_problems")
6167
ws.dashboards.list.assert_not_called()
62-
crawlers.assert_not_called()
68+
dfsa_crawler.assert_not_called()
69+
used_tables_crawler.assert_not_called()

0 commit comments

Comments
 (0)