-
Notifications
You must be signed in to change notification settings - Fork 96
Update table-migration workflows to also capture updated migration progress into the history log #3239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Update table-migration workflows to also capture updated migration progress into the history log #3239
Changes from all commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
905fefc
Remove some unnecessary code.
asnare c23c38a
Update table history logger to not trigger a refresh of the migration…
asnare 9ebb26c
Consistent import.
asnare e861963
Update the various table-migration workflows to also update the histo…
asnare cfe1486
All workflows update the logs table.
asnare 145cd7d
Table migration workflows also update the tables inventory (at the end).
asnare 5289582
Merge branch 'main' into more-workflow-history-snapshots
asnare 6e5e4ae
Switch to multi-line f""" """-string.
asnare 778ad10
Merge branch 'main' into more-workflow-history-snapshots
asnare 1ce8e05
Fix mock return value for crawler snapshot.
asnare 78787df
Merge branch 'main' into more-workflow-history-snapshots
asnare 788789f
Switch to specialisation (limited to TableProgressEncoder) for ensuri…
asnare 891b3b7
Merge branch 'main' into more-workflow-history-snapshots
asnare f9bf219
Merge branch 'main' into more-workflow-history-snapshots
asnare da3b15b
Back out changes relating to the way the migration-status information…
asnare ac8586a
Back out more changes that are either not needed or made on other PRs.
asnare 4b3717f
Remove comment that is no longer relevant.
asnare c80a9c6
Verify prerequisites for updating the migration-progress prior to the…
asnare f638cb5
No need to mention the assessment; we won't reach this point of the w…
asnare 2d398f4
Use TODO marker instead of warning to highlight what we'd prefer to h…
asnare df9f689
Merge branch 'main' into more-workflow-history-snapshots
nfx 4e579fd
Merge branch 'main' into more-workflow-history-snapshots
nfx e4d4220
Merge branch 'main' into more-workflow-history-snapshots
asnare 14cdc77
Merge branch 'main' into more-workflow-history-snapshots
nfx 1a32bd7
Merge branch 'main' into more-workflow-history-snapshots
asnare 1df72bf
Merge branch 'main' into more-workflow-history-snapshots
asnare dbafbac
Ensure the assessment has finished before table-migration runs, and t…
asnare 839ac76
Ensure the progress-migration catalog is configured.
asnare 862b5bf
Remove unused fixture.
asnare 54d06f7
Merge branch 'main' into more-workflow-history-snapshots
asnare 12a1d9f
Merge branch 'main' into more-workflow-history-snapshots
asnare 7774a78
Merge branch 'main' into more-workflow-history-snapshots
asnare a074fd7
Remove unused fixture.
asnare 034c91b
Split over several lines to make debugging easier.
asnare dd74dd9
Refactor for debugging.
asnare 4dde220
Fix test to invoke the workflow its verifying.
asnare 007e23b
Merge branch 'main' into more-workflow-history-snapshots
asnare 2dbc2e4
Adjust the debugging convenience.
asnare 375276e
Merge branch 'main' into more-workflow-history-snapshots
asnare e30d2cc
Configure test with new infrastructure.
asnare c777794
Fix linting problems.
asnare b763886
Merge branch 'main' into more-workflow-history-snapshots
asnare c2127a5
Merge branch 'main' into more-workflow-history-snapshots
asnare babbc69
Rename task to verify progress-tracking prerequisites.
asnare 1e45dc0
Formatting.
asnare c966766
Rename method to better indicate purpose.
asnare 5e653de
Inline a local variable.
asnare ade16ae
Remove misleading TODO marker in lieu of #3422.
asnare bad20f2
Use alternate plural spelling.
asnare c13b423
Merge branch 'main' into more-workflow-history-snapshots
asnare 7aa937f
Merge branch 'main' into more-workflow-history-snapshots
asnare fe34a3b
Use context consistently
JCZuurmond e0aeb4c
Merge branch 'main' into more-workflow-history-snapshots
asnare File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
import datetime as dt | ||
|
||
from databricks.labs.ucx.assessment.workflows import Assessment | ||
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext | ||
from databricks.labs.ucx.framework.tasks import Workflow, job_task | ||
|
@@ -57,10 +59,53 @@ def migrate_views(self, ctx: RuntimeContext): | |
""" | ||
ctx.tables_migrator.migrate_tables(what=What.VIEW) | ||
|
||
@job_task(job_cluster="user_isolation", depends_on=[migrate_views]) | ||
def update_migration_status(self, ctx: RuntimeContext): | ||
"""Refresh the migration status to present it in the dashboard.""" | ||
ctx.tables_migrator.get_remaining_tables() | ||
@job_task(job_cluster="user_isolation") | ||
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: | ||
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" | ||
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This forces the UCX catalog to be created before table migration, while it was not a pre-requiste before |
||
|
||
@job_task( | ||
depends_on=[ | ||
convert_managed_table, | ||
migrate_external_tables_sync, | ||
migrate_dbfs_root_delta_tables, | ||
migrate_dbfs_root_non_delta_tables, | ||
migrate_views, | ||
verify_progress_tracking_prerequisites, | ||
], | ||
) | ||
def update_table_inventory(self, ctx: RuntimeContext) -> None: | ||
"""Refresh the tables inventory, prior to updating the migration status of all the tables.""" | ||
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not | ||
# UC-enabled, so we cannot both snapshot and update the history log from the same location. | ||
# Step 1 of 3: Just refresh the tables inventory. | ||
ctx.tables_crawler.snapshot(force_refresh=True) | ||
asnare marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@job_task(depends_on=[verify_progress_tracking_prerequisites, update_table_inventory], job_cluster="user_isolation") | ||
def update_migration_status(self, ctx: RuntimeContext) -> None: | ||
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" | ||
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) | ||
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) | ||
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) | ||
|
||
@job_task( | ||
depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation" | ||
) | ||
def update_tables_history_log(self, ctx: RuntimeContext) -> None: | ||
"""Update the history log with the latest tables inventory and migration status.""" | ||
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the | ||
# history log. | ||
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. | ||
tables_snapshot = ctx.tables_crawler.snapshot() | ||
# Note: encoding the Table records will trigger loading of the migration-status data. | ||
ctx.tables_progress.append_inventory_snapshot(tables_snapshot) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] | ||
) | ||
def record_workflow_run(self, ctx: RuntimeContext) -> None: | ||
"""Record the workflow run of this workflow.""" | ||
ctx.workflow_run_recorder.record() | ||
|
||
|
||
class MigrateHiveSerdeTablesInPlace(Workflow): | ||
|
@@ -86,10 +131,44 @@ def migrate_views(self, ctx: RuntimeContext): | |
""" | ||
ctx.tables_migrator.migrate_tables(what=What.VIEW) | ||
|
||
@job_task(job_cluster="user_isolation", depends_on=[migrate_views]) | ||
def update_migration_status(self, ctx: RuntimeContext): | ||
"""Refresh the migration status to present it in the dashboard.""" | ||
ctx.tables_migrator.get_remaining_tables() | ||
@job_task(job_cluster="user_isolation") | ||
asnare marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: | ||
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" | ||
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) | ||
|
||
@job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_views]) | ||
def update_table_inventory(self, ctx: RuntimeContext) -> None: | ||
"""Refresh the tables inventory, prior to updating the migration status of all the tables.""" | ||
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not | ||
# UC-enabled, so we cannot both snapshot and update the history log from the same location. | ||
# Step 1 of 3: Just refresh the tables inventory. | ||
ctx.tables_crawler.snapshot(force_refresh=True) | ||
|
||
@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) | ||
def update_migration_status(self, ctx: RuntimeContext) -> None: | ||
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" | ||
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) | ||
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) | ||
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] | ||
) | ||
def update_tables_history_log(self, ctx: RuntimeContext) -> None: | ||
"""Update the history log with the latest tables inventory and migration status.""" | ||
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the | ||
# history log. | ||
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. | ||
tables_snapshot = ctx.tables_crawler.snapshot() | ||
# Note: encoding the Table records will trigger loading of the migration-status data. | ||
ctx.tables_progress.append_inventory_snapshot(tables_snapshot) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] | ||
) | ||
def record_workflow_run(self, ctx: RuntimeContext) -> None: | ||
"""Record the workflow run of this workflow.""" | ||
ctx.workflow_run_recorder.record() | ||
|
||
|
||
class MigrateExternalTablesCTAS(Workflow): | ||
|
@@ -120,10 +199,51 @@ def migrate_views(self, ctx: RuntimeContext): | |
""" | ||
ctx.tables_migrator.migrate_tables(what=What.VIEW) | ||
|
||
@job_task(job_cluster="user_isolation", depends_on=[migrate_views]) | ||
def update_migration_status(self, ctx: RuntimeContext): | ||
"""Refresh the migration status to present it in the dashboard.""" | ||
ctx.tables_migrator.get_remaining_tables() | ||
@job_task(job_cluster="user_isolation") | ||
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: | ||
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" | ||
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) | ||
|
||
@job_task( | ||
depends_on=[ | ||
verify_progress_tracking_prerequisites, | ||
migrate_views, | ||
migrate_hive_serde_ctas, | ||
migrate_other_external_ctas, | ||
] | ||
) | ||
def update_table_inventory(self, ctx: RuntimeContext) -> None: | ||
"""Refresh the tables inventory, prior to updating the migration status of all the tables.""" | ||
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not | ||
# UC-enabled, so cannot both snapshot and update the history log from the same location. | ||
# Step 1 of 3: Just refresh the tables inventory. | ||
ctx.tables_crawler.snapshot(force_refresh=True) | ||
|
||
@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) | ||
def update_migration_status(self, ctx: RuntimeContext) -> None: | ||
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" | ||
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) | ||
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) | ||
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] | ||
) | ||
def update_tables_history_log(self, ctx: RuntimeContext) -> None: | ||
asnare marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Update the history log with the latest tables inventory and migration status.""" | ||
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the | ||
# history log. | ||
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. | ||
tables_snapshot = ctx.tables_crawler.snapshot() | ||
# Note: encoding the Table records will trigger loading of the migration-status data. | ||
ctx.tables_progress.append_inventory_snapshot(tables_snapshot) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] | ||
) | ||
def record_workflow_run(self, ctx: RuntimeContext) -> None: | ||
"""Record the workflow run of this workflow.""" | ||
ctx.workflow_run_recorder.record() | ||
|
||
|
||
class ScanTablesInMounts(Workflow): | ||
|
@@ -137,10 +257,36 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): | |
replacing any existing content that might be present.""" | ||
ctx.tables_in_mounts.snapshot(force_refresh=True) | ||
|
||
@job_task(job_cluster="user_isolation", depends_on=[scan_tables_in_mounts_experimental]) | ||
def update_migration_status(self, ctx: RuntimeContext): | ||
"""Refresh the migration status to present it in the dashboard.""" | ||
ctx.tables_migrator.get_remaining_tables() | ||
@job_task(job_cluster="user_isolation") | ||
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: | ||
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" | ||
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", | ||
depends_on=[verify_progress_tracking_prerequisites, scan_tables_in_mounts_experimental], | ||
) | ||
def update_migration_status(self, ctx: RuntimeContext) -> None: | ||
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" | ||
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) | ||
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] | ||
) | ||
def update_tables_history_log(self, ctx: RuntimeContext) -> None: | ||
"""Update the history log with the latest tables inventory and migration status.""" | ||
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. | ||
tables_snapshot = ctx.tables_crawler.snapshot() | ||
# Note: encoding the Table records will trigger loading of the migration-status data. | ||
ctx.tables_progress.append_inventory_snapshot(tables_snapshot) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] | ||
) | ||
def record_workflow_run(self, ctx: RuntimeContext) -> None: | ||
"""Record the workflow run of this workflow.""" | ||
ctx.workflow_run_recorder.record() | ||
|
||
|
||
class MigrateTablesInMounts(Workflow): | ||
|
@@ -152,7 +298,41 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): | |
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement.""" | ||
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT) | ||
|
||
@job_task(job_cluster="user_isolation", depends_on=[migrate_tables_in_mounts_experimental]) | ||
def update_migration_status(self, ctx: RuntimeContext): | ||
"""Refresh the migration status to present it in the dashboard.""" | ||
ctx.tables_migrator.get_remaining_tables() | ||
@job_task(job_cluster="user_isolation") | ||
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: | ||
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" | ||
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) | ||
|
||
@job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_tables_in_mounts_experimental]) | ||
def update_table_inventory(self, ctx: RuntimeContext) -> None: | ||
"""Refresh the tables inventory, prior to updating the migration status of all the tables.""" | ||
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not | ||
# UC-enabled, so we cannot both snapshot and update the history log from the same location. | ||
# Step 1 of 3: Just refresh the tables inventory. | ||
ctx.tables_crawler.snapshot(force_refresh=True) | ||
|
||
@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) | ||
def update_migration_status(self, ctx: RuntimeContext) -> None: | ||
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" | ||
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) | ||
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) | ||
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] | ||
) | ||
def update_tables_history_log(self, ctx: RuntimeContext) -> None: | ||
"""Update the history log with the latest tables inventory and migration status.""" | ||
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the | ||
# history log. | ||
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. | ||
tables_snapshot = ctx.tables_crawler.snapshot() | ||
# Note: encoding the Table records will trigger loading of the migration-status data. | ||
ctx.tables_progress.append_inventory_snapshot(tables_snapshot) | ||
|
||
@job_task( | ||
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] | ||
) | ||
def record_workflow_run(self, ctx: RuntimeContext) -> None: | ||
"""Record the workflow run of this workflow.""" | ||
ctx.workflow_run_recorder.record() |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.