Skip to content

Commit 6432a28

Browse files
authored
Add table progress encoder (#3083)
## Changes Add pipeline progress encoder ### Linked issues Resolves #3061 Resolves #3064 ### Functionality - [x] modified existing workflow: `migration-progress` ### Tests - [x] added unit tests
1 parent 522f48a commit 6432a28

File tree

7 files changed

+152
-45
lines changed

7 files changed

+152
-45
lines changed

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
from databricks.labs.blueprint.installation import Installation
55
from databricks.labs.lsql.backends import RuntimeBackend, SqlBackend
6-
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus
76
from databricks.sdk import WorkspaceClient, core
87

98
from databricks.labs.ucx.__about__ import __version__
@@ -21,13 +20,14 @@
2120
from databricks.labs.ucx.config import WorkspaceConfig
2221
from databricks.labs.ucx.contexts.application import GlobalContext
2322
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
24-
from databricks.labs.ucx.hive_metastore.grants import Grant
2523
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
26-
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
24+
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
2725
from databricks.labs.ucx.hive_metastore.udfs import Udf
2826
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
27+
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
2928
from databricks.labs.ucx.progress.history import ProgressEncoder
3029
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
30+
from databricks.labs.ucx.progress.tables import TableProgressEncoder
3131
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder
3232

3333
# As with GlobalContext, service factories unavoidably have a lot of public methods.
@@ -188,11 +188,10 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
188188
)
189189

190190
@cached_property
191-
def grants_progress(self) -> ProgressEncoder[Grant]:
192-
return ProgressEncoder(
191+
def grants_progress(self) -> GrantProgressEncoder:
192+
return GrantProgressEncoder(
193193
self.sql_backend,
194194
self.grant_ownership,
195-
Grant,
196195
self.parent_run_id,
197196
self.workspace_id,
198197
self.config.ucx_catalog,
@@ -221,23 +220,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
221220
)
222221

223222
@cached_property
224-
def tables_progress(self) -> ProgressEncoder[Table]:
225-
return ProgressEncoder(
223+
def tables_progress(self) -> TableProgressEncoder:
224+
return TableProgressEncoder(
226225
self.sql_backend,
227226
self.table_ownership,
228-
Table,
229-
self.parent_run_id,
230-
self.workspace_id,
231-
self.config.ucx_catalog,
232-
)
233-
234-
@cached_property
235-
def historical_table_migration_log(self) -> ProgressEncoder[TableMigrationStatus]:
236-
# TODO: merge into tables_progress
237-
return ProgressEncoder(
238-
self.sql_backend,
239-
self.table_migration_ownership,
240-
TableMigrationStatus,
227+
self.migration_status_refresher.index(force_refresh=False),
241228
self.parent_run_id,
242229
self.workspace_id,
243230
self.config.ucx_catalog,

src/databricks/labs/ucx/progress/grants.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,30 @@
11
from dataclasses import replace
22

3-
from databricks.labs.ucx.hive_metastore.grants import Grant
3+
from databricks.labs.lsql.backends import SqlBackend
4+
5+
from databricks.labs.ucx.hive_metastore.grants import Grant, GrantOwnership
46
from databricks.labs.ucx.progress.history import ProgressEncoder
57
from databricks.labs.ucx.progress.install import Historical
68

79

8-
class GrantsProgressEncoder(ProgressEncoder[Grant]):
10+
class GrantProgressEncoder(ProgressEncoder[Grant]):
911
"""Encoder class:Grant to class:History.
1012
1113
A failure for a grants implies it cannot be mapped to Unity Catalog.
1214
"""
1315

16+
def __init__(
17+
self,
18+
sql_backend: SqlBackend,
19+
ownership: GrantOwnership,
20+
run_id: int,
21+
workspace_id: int,
22+
catalog: str,
23+
schema: str = "multiworkspace",
24+
table: str = "historical",
25+
) -> None:
26+
super().__init__(sql_backend, ownership, Grant, run_id, workspace_id, catalog, schema, table)
27+
1428
def _encode_record_as_historical(self, record: Grant) -> Historical:
1529
historical = super()._encode_record_as_historical(record)
1630
failures = []
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from dataclasses import replace
2+
3+
from databricks.labs.lsql.backends import SqlBackend
4+
5+
from databricks.labs.ucx.hive_metastore.tables import Table
6+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
7+
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
8+
from databricks.labs.ucx.progress.history import ProgressEncoder
9+
from databricks.labs.ucx.progress.install import Historical
10+
11+
12+
class TableProgressEncoder(ProgressEncoder[Table]):
13+
"""Encoder class:Table to class:History.
14+
15+
A progress failure for a table means:
16+
- the table is not migrated yet
17+
- the associated grants have a failure
18+
"""
19+
20+
def __init__(
21+
self,
22+
sql_backend: SqlBackend,
23+
ownership: TableOwnership,
24+
table_migration_index: TableMigrationIndex,
25+
run_id: int,
26+
workspace_id: int,
27+
catalog: str,
28+
schema: str = "multiworkspace",
29+
table: str = "historical",
30+
) -> None:
31+
super().__init__(
32+
sql_backend,
33+
ownership,
34+
Table,
35+
run_id,
36+
workspace_id,
37+
catalog,
38+
schema,
39+
table,
40+
)
41+
self._table_migration_index = table_migration_index
42+
43+
def _encode_record_as_historical(self, record: Table) -> Historical:
44+
"""Encode record as historical.
45+
46+
A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant
47+
might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure
48+
for tables that are migrated to UC with their relevant grants also being migrated.
49+
"""
50+
historical = super()._encode_record_as_historical(record)
51+
failures = []
52+
if not self._table_migration_index.is_migrated(record.database, record.name):
53+
failures.append("Pending migration")
54+
return replace(historical, failures=historical.failures + failures)

src/databricks/labs/ucx/progress/workflows.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ def crawl_tables(self, ctx: RuntimeContext) -> None:
4747
ctx.tables_crawler.snapshot(force_refresh=True)
4848

4949
@job_task(depends_on=[verify_prerequisites, crawl_tables], job_cluster="table_migration")
50+
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
51+
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
52+
ctx.migration_status_refresher.snapshot(force_refresh=True)
53+
54+
@job_task(
55+
depends_on=[verify_prerequisites, crawl_tables, refresh_table_migration_status], job_cluster="table_migration"
56+
)
5057
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
5158
"""Update the history log with the latest tables inventory snapshot."""
5259
# The table migration cluster is not legacy-ACL enabled, so we can't crawl from here.
@@ -136,16 +143,6 @@ def crawl_cluster_policies(self, ctx: RuntimeContext) -> None:
136143
cluster_policies_snapshot = ctx.policies_crawler.snapshot(force_refresh=True)
137144
history_log.append_inventory_snapshot(cluster_policies_snapshot)
138145

139-
@job_task(depends_on=[verify_prerequisites, crawl_tables, verify_prerequisites], job_cluster="table_migration")
140-
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
141-
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not.
142-
143-
The results of the scan are stored in the `$inventory.migration_status` inventory table.
144-
"""
145-
history_log = ctx.historical_table_migration_log
146-
migration_status_snapshot = ctx.migration_status_refresher.snapshot(force_refresh=True)
147-
history_log.append_inventory_snapshot(migration_status_snapshot)
148-
149146
@job_task(depends_on=[verify_prerequisites])
150147
def assess_dashboards(self, ctx: RuntimeContext):
151148
"""Scans all dashboards for migration issues in SQL code of embedded widgets.

tests/unit/progress/test_grants.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from databricks.labs.ucx.framework.owners import Ownership
66
from databricks.labs.ucx.framework.utils import escape_sql_identifier
77
from databricks.labs.ucx.hive_metastore.grants import Grant
8-
from databricks.labs.ucx.progress.grants import GrantsProgressEncoder
8+
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
99

1010

1111
@pytest.mark.parametrize(
@@ -17,13 +17,12 @@
1717
Grant("principal", "USAGE", "catalog"),
1818
],
1919
)
20-
def test_grants_progress_encoder_no_failures(mock_backend, grant: Grant) -> None:
20+
def test_grant_progress_encoder_no_failures(mock_backend, grant: Grant) -> None:
2121
ownership = create_autospec(Ownership)
2222
ownership.owner_of.return_value = "user"
23-
encoder = GrantsProgressEncoder(
23+
encoder = GrantProgressEncoder(
2424
mock_backend,
2525
ownership,
26-
Grant,
2726
run_id=1,
2827
workspace_id=123456789,
2928
catalog="test",
@@ -50,13 +49,12 @@ def test_grants_progress_encoder_no_failures(mock_backend, grant: Grant) -> None
5049
),
5150
],
5251
)
53-
def test_grants_progress_encoder_failures(mock_backend, grant: Grant, failure: str) -> None:
52+
def test_grant_progress_encoder_failures(mock_backend, grant: Grant, failure: str) -> None:
5453
ownership = create_autospec(Ownership)
5554
ownership.owner_of.return_value = "user"
56-
encoder = GrantsProgressEncoder(
55+
encoder = GrantProgressEncoder(
5756
mock_backend,
5857
ownership,
59-
Grant,
6058
run_id=1,
6159
workspace_id=123456789,
6260
catalog="test",

tests/unit/progress/test_tables.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from unittest.mock import create_autospec
2+
3+
import pytest
4+
5+
from databricks.labs.ucx.framework.owners import Ownership
6+
from databricks.labs.ucx.framework.utils import escape_sql_identifier
7+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
8+
from databricks.labs.ucx.hive_metastore.tables import Table
9+
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
10+
from databricks.labs.ucx.progress.tables import TableProgressEncoder
11+
12+
13+
@pytest.mark.parametrize(
14+
"table",
15+
[
16+
Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"),
17+
],
18+
)
19+
def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
20+
ownership = create_autospec(Ownership)
21+
ownership.owner_of.return_value = "user"
22+
table_migration_index = create_autospec(TableMigrationIndex)
23+
table_migration_index.is_migrated.return_value = True
24+
grant_progress_encoder = create_autospec(GrantProgressEncoder)
25+
encoder = TableProgressEncoder(
26+
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
27+
)
28+
29+
encoder.append_inventory_snapshot([table])
30+
31+
rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
32+
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
33+
assert len(rows[0].failures) == 0
34+
ownership.owner_of.assert_called_once()
35+
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
36+
grant_progress_encoder.assert_not_called()
37+
38+
39+
@pytest.mark.parametrize(
40+
"table",
41+
[
42+
Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"),
43+
],
44+
)
45+
def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None:
46+
ownership = create_autospec(Ownership)
47+
ownership.owner_of.return_value = "user"
48+
table_migration_index = create_autospec(TableMigrationIndex)
49+
table_migration_index.is_migrated.return_value = False
50+
grant_progress_encoder = create_autospec(GrantProgressEncoder)
51+
encoder = TableProgressEncoder(
52+
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
53+
)
54+
55+
encoder.append_inventory_snapshot([table])
56+
57+
rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
58+
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
59+
assert rows[0].failures == ["Pending migration"]
60+
ownership.owner_of.assert_called_once()
61+
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
62+
grant_progress_encoder.assert_not_called()

tests/unit/progress/test_workflows.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,6 @@
2626
RuntimeContext.policies_crawler,
2727
RuntimeContext.policies_progress,
2828
),
29-
(
30-
MigrationProgress.refresh_table_migration_status,
31-
RuntimeContext.migration_status_refresher,
32-
RuntimeContext.historical_table_migration_log,
33-
),
3429
),
3530
)
3631
def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history_log) -> None:

0 commit comments

Comments
 (0)