Skip to content

Commit 36752ed

Browse files
asnarenfxJCZuurmond
authored
Update table-migration workflows to also capture updated migration progress into the history log (#3239)
## Changes The table-migration workflows already contained tasks at the end that log information about tables that still need to be migrated. The primary purpose of this PR is to update these workflows so they also capture updated progress information into the history log. Other changes include: - Updating the documentation for which workflows update which tables. - ~Updating the (singleton) encoder for table-history so that initialisation doesn't trigger an implicit refresh of the `TableMigrationStatus` data. Instead this is controlled at the workflow level, as intended.~ Moved to #3270. ### Linked issues ~Conflicts with #3200 (will need rebasing).~ (Resolved.) ### Functionality - updated documentation - modified existing workflows: - `migrate-tables` - `migrate-external-hiveserde-tables-in-place-experimental` - `migrate-external-tables-ctas` - `scan-tables-in-mounts-experimental` - `migrate-tables-in-mounts-experimental` ### Tests - manually tested - updated and new unit tests - updated and new integration tests --------- Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com> Co-authored-by: Cor Zuurmond <jczuurmond@protonmail.com>
1 parent a77ca8b commit 36752ed

File tree

8 files changed

+467
-90
lines changed

8 files changed

+467
-90
lines changed

docs/table_persistence.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Table utilization per workflow:
88

99
| Table | Generate Assessment | Update Migration Progress | Migrate Groups | Migrate External Tables | Upgrade Jobs | Migrate tables | Migrate Data Reconciliation |
1010
|--------------------------|---------------------|---------------------------|----------------|-------------------------|--------------|----------------|-----------------------------|
11-
| tables | RW | RW | | RO | | RO | |
11+
| tables | RW | RW | | RW | | RW | |
1212
| grants | RW | RW | | RW | | RW | |
1313
| mounts | RW | | | RO | RO | RO | |
1414
| permissions | RW | | RW | RO | | RO | |
@@ -30,7 +30,7 @@ Table utilization per workflow:
3030
| query_problems | RW | RW | | | | | |
3131
| workflow_problems | RW | RW | | | | | |
3232
| udfs | RW | RW | RO | | | | |
33-
| logs | RW | | RW | RW | | RW | RW |
33+
| logs | RW | RW | RW | RW | RW | RW | RW |
3434
| recon_results | | | | | | | RW |
3535
| redash_dashboards | RW | | | | | | RW |
3636
| lakeview_dashboards | RW | | | | | | RW |

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import re
44
from collections import defaultdict
5+
from collections.abc import Iterable
56
from functools import partial, cached_property
67

78
from databricks.labs.blueprint.parallel import Threads
@@ -18,8 +19,11 @@
1819
TableMapping,
1920
TableToMigrate,
2021
)
21-
22-
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher, TableMigrationIndex
22+
from databricks.labs.ucx.hive_metastore.table_migration_status import (
23+
TableMigrationStatusRefresher,
24+
TableMigrationStatus,
25+
TableMigrationIndex,
26+
)
2327
from databricks.labs.ucx.hive_metastore.tables import (
2428
MigrationCount,
2529
Table,
@@ -56,14 +60,11 @@ def __init__(
5660
self._migrate_grants = migrate_grants
5761
self._external_locations = external_locations
5862

59-
def get_remaining_tables(self) -> list[Table]:
60-
migration_index = self.index(force_refresh=True)
61-
table_rows = []
63+
def warn_about_remaining_non_migrated_tables(self, migration_statuses: Iterable[TableMigrationStatus]) -> None:
64+
migration_index = TableMigrationIndex(migration_statuses)
6265
for crawled_table in self._tables_crawler.snapshot():
6366
if not migration_index.is_migrated(crawled_table.database, crawled_table.name):
64-
table_rows.append(crawled_table)
6567
logger.warning(f"remained-hive-metastore-table: {crawled_table.key}")
66-
return table_rows
6768

6869
def index(self, *, force_refresh: bool = False):
6970
return self._migration_status_refresher.index(force_refresh=force_refresh)

src/databricks/labs/ucx/hive_metastore/table_migration_status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_
9090
self._tables_crawler = tables_crawler
9191

9292
def index(self, *, force_refresh: bool = False) -> TableMigrationIndex:
93-
return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh)))
93+
return TableMigrationIndex(self.snapshot(force_refresh=force_refresh))
9494

9595
def get_seen_tables(self) -> dict[str, str]:
9696
seen_tables: dict[str, str] = {}

src/databricks/labs/ucx/hive_metastore/workflows.py

Lines changed: 200 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import datetime as dt
2+
13
from databricks.labs.ucx.assessment.workflows import Assessment
24
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
35
from databricks.labs.ucx.framework.tasks import Workflow, job_task
@@ -57,10 +59,53 @@ def migrate_views(self, ctx: RuntimeContext):
5759
"""
5860
ctx.tables_migrator.migrate_tables(what=What.VIEW)
5961

60-
@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
61-
def update_migration_status(self, ctx: RuntimeContext):
62-
"""Refresh the migration status to present it in the dashboard."""
63-
ctx.tables_migrator.get_remaining_tables()
62+
@job_task(job_cluster="user_isolation")
63+
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
64+
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
65+
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))
66+
67+
@job_task(
68+
depends_on=[
69+
convert_managed_table,
70+
migrate_external_tables_sync,
71+
migrate_dbfs_root_delta_tables,
72+
migrate_dbfs_root_non_delta_tables,
73+
migrate_views,
74+
verify_progress_tracking_prerequisites,
75+
],
76+
)
77+
def update_table_inventory(self, ctx: RuntimeContext) -> None:
78+
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
79+
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
80+
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
81+
# Step 1 of 3: Just refresh the tables inventory.
82+
ctx.tables_crawler.snapshot(force_refresh=True)
83+
84+
@job_task(depends_on=[verify_progress_tracking_prerequisites, update_table_inventory], job_cluster="user_isolation")
85+
def update_migration_status(self, ctx: RuntimeContext) -> None:
86+
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
87+
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
88+
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
89+
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)
90+
91+
@job_task(
92+
depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation"
93+
)
94+
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
95+
"""Update the history log with the latest tables inventory and migration status."""
96+
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
97+
# history log.
98+
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
99+
tables_snapshot = ctx.tables_crawler.snapshot()
100+
# Note: encoding the Table records will trigger loading of the migration-status data.
101+
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)
102+
103+
@job_task(
104+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
105+
)
106+
def record_workflow_run(self, ctx: RuntimeContext) -> None:
107+
"""Record the workflow run of this workflow."""
108+
ctx.workflow_run_recorder.record()
64109

65110

66111
class MigrateHiveSerdeTablesInPlace(Workflow):
@@ -86,10 +131,44 @@ def migrate_views(self, ctx: RuntimeContext):
86131
"""
87132
ctx.tables_migrator.migrate_tables(what=What.VIEW)
88133

89-
@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
90-
def update_migration_status(self, ctx: RuntimeContext):
91-
"""Refresh the migration status to present it in the dashboard."""
92-
ctx.tables_migrator.get_remaining_tables()
134+
@job_task(job_cluster="user_isolation")
135+
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
136+
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
137+
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))
138+
139+
@job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_views])
140+
def update_table_inventory(self, ctx: RuntimeContext) -> None:
141+
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
142+
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
143+
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
144+
# Step 1 of 3: Just refresh the tables inventory.
145+
ctx.tables_crawler.snapshot(force_refresh=True)
146+
147+
@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
148+
def update_migration_status(self, ctx: RuntimeContext) -> None:
149+
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
150+
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
151+
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
152+
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)
153+
154+
@job_task(
155+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
156+
)
157+
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
158+
"""Update the history log with the latest tables inventory and migration status."""
159+
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
160+
# history log.
161+
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
162+
tables_snapshot = ctx.tables_crawler.snapshot()
163+
# Note: encoding the Table records will trigger loading of the migration-status data.
164+
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)
165+
166+
@job_task(
167+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
168+
)
169+
def record_workflow_run(self, ctx: RuntimeContext) -> None:
170+
"""Record the workflow run of this workflow."""
171+
ctx.workflow_run_recorder.record()
93172

94173

95174
class MigrateExternalTablesCTAS(Workflow):
@@ -120,10 +199,51 @@ def migrate_views(self, ctx: RuntimeContext):
120199
"""
121200
ctx.tables_migrator.migrate_tables(what=What.VIEW)
122201

123-
@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
124-
def update_migration_status(self, ctx: RuntimeContext):
125-
"""Refresh the migration status to present it in the dashboard."""
126-
ctx.tables_migrator.get_remaining_tables()
202+
@job_task(job_cluster="user_isolation")
203+
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
204+
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
205+
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))
206+
207+
@job_task(
208+
depends_on=[
209+
verify_progress_tracking_prerequisites,
210+
migrate_views,
211+
migrate_hive_serde_ctas,
212+
migrate_other_external_ctas,
213+
]
214+
)
215+
def update_table_inventory(self, ctx: RuntimeContext) -> None:
216+
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
217+
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
218+
# UC-enabled, so cannot both snapshot and update the history log from the same location.
219+
# Step 1 of 3: Just refresh the tables inventory.
220+
ctx.tables_crawler.snapshot(force_refresh=True)
221+
222+
@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
223+
def update_migration_status(self, ctx: RuntimeContext) -> None:
224+
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
225+
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
226+
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
227+
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)
228+
229+
@job_task(
230+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
231+
)
232+
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
233+
"""Update the history log with the latest tables inventory and migration status."""
234+
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
235+
# history log.
236+
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
237+
tables_snapshot = ctx.tables_crawler.snapshot()
238+
# Note: encoding the Table records will trigger loading of the migration-status data.
239+
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)
240+
241+
@job_task(
242+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
243+
)
244+
def record_workflow_run(self, ctx: RuntimeContext) -> None:
245+
"""Record the workflow run of this workflow."""
246+
ctx.workflow_run_recorder.record()
127247

128248

129249
class ScanTablesInMounts(Workflow):
@@ -137,10 +257,36 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
137257
replacing any existing content that might be present."""
138258
ctx.tables_in_mounts.snapshot(force_refresh=True)
139259

140-
@job_task(job_cluster="user_isolation", depends_on=[scan_tables_in_mounts_experimental])
141-
def update_migration_status(self, ctx: RuntimeContext):
142-
"""Refresh the migration status to present it in the dashboard."""
143-
ctx.tables_migrator.get_remaining_tables()
260+
@job_task(job_cluster="user_isolation")
261+
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
262+
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
263+
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))
264+
265+
@job_task(
266+
job_cluster="user_isolation",
267+
depends_on=[verify_progress_tracking_prerequisites, scan_tables_in_mounts_experimental],
268+
)
269+
def update_migration_status(self, ctx: RuntimeContext) -> None:
270+
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
271+
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
272+
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)
273+
274+
@job_task(
275+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
276+
)
277+
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
278+
"""Update the history log with the latest tables inventory and migration status."""
279+
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
280+
tables_snapshot = ctx.tables_crawler.snapshot()
281+
# Note: encoding the Table records will trigger loading of the migration-status data.
282+
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)
283+
284+
@job_task(
285+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
286+
)
287+
def record_workflow_run(self, ctx: RuntimeContext) -> None:
288+
"""Record the workflow run of this workflow."""
289+
ctx.workflow_run_recorder.record()
144290

145291

146292
class MigrateTablesInMounts(Workflow):
@@ -152,7 +298,41 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
152298
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement."""
153299
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)
154300

155-
@job_task(job_cluster="user_isolation", depends_on=[migrate_tables_in_mounts_experimental])
156-
def update_migration_status(self, ctx: RuntimeContext):
157-
"""Refresh the migration status to present it in the dashboard."""
158-
ctx.tables_migrator.get_remaining_tables()
301+
@job_task(job_cluster="user_isolation")
302+
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
303+
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
304+
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))
305+
306+
@job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_tables_in_mounts_experimental])
307+
def update_table_inventory(self, ctx: RuntimeContext) -> None:
308+
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
309+
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
310+
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
311+
# Step 1 of 3: Just refresh the tables inventory.
312+
ctx.tables_crawler.snapshot(force_refresh=True)
313+
314+
@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
315+
def update_migration_status(self, ctx: RuntimeContext) -> None:
316+
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
317+
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
318+
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
319+
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)
320+
321+
@job_task(
322+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
323+
)
324+
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
325+
"""Update the history log with the latest tables inventory and migration status."""
326+
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
327+
# history log.
328+
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
329+
tables_snapshot = ctx.tables_crawler.snapshot()
330+
# Note: encoding the Table records will trigger loading of the migration-status data.
331+
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)
332+
333+
@job_task(
334+
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
335+
)
336+
def record_workflow_run(self, ctx: RuntimeContext) -> None:
337+
"""Record the workflow run of this workflow."""
338+
ctx.workflow_run_recorder.record()

0 commit comments

Comments
 (0)