From e66ec2dac884acd4bcb90a6ff94eb6ba5d39d4ae Mon Sep 17 00:00:00 2001 From: pritishpai Date: Wed, 25 Jun 2025 10:31:38 -0400 Subject: [PATCH 01/18] Full refresh all crawlers for each assessment run --- .../labs/ucx/assessment/workflows.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index 3c0efbd7af..b3e63a6b7d 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -18,14 +18,14 @@ def crawl_tables(self, ctx: RuntimeContext): `$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog.""" - ctx.tables_crawler.snapshot() + ctx.tables_crawler.snapshot(force_refresh=True) @job_task def crawl_udfs(self, ctx: RuntimeContext): """Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for issues with grants that cannot be migrated to Unit Catalog.""" - ctx.udfs_crawler.snapshot() + ctx.udfs_crawler.snapshot(force_refresh=True) @job_task(job_cluster="tacl") def setup_tacl(self, ctx: RuntimeContext): @@ -40,7 +40,7 @@ def crawl_grants(self, ctx: RuntimeContext): Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table ACLs enabled and available for retrieval.""" - ctx.grants_crawler.snapshot() + ctx.grants_crawler.snapshot(force_refresh=True) @job_task(depends_on=[crawl_tables]) def estimate_table_size_for_migration(self, ctx: RuntimeContext): @@ -48,7 +48,7 @@ def estimate_table_size_for_migration(self, ctx: RuntimeContext): "synced". These tables will have to be cloned in the migration process. Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes. The table size is a factor in deciding whether to clone these tables.""" - ctx.table_size_crawler.snapshot() + ctx.table_size_crawler.snapshot(force_refresh=True) @job_task def crawl_mounts(self, ctx: RuntimeContext): @@ -58,7 +58,7 @@ def crawl_mounts(self, ctx: RuntimeContext): The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently storing this information in the `$inventory.mounts` table. This is crucial for planning the migration.""" - ctx.mounts_crawler.snapshot() + ctx.mounts_crawler.snapshot(force_refresh=True) @job_task(depends_on=[crawl_mounts, crawl_tables]) def guess_external_locations(self, ctx: RuntimeContext): @@ -70,7 +70,7 @@ def guess_external_locations(self, ctx: RuntimeContext): - Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead - Scanning all these locations to identify folders that can act as shared path prefixes - These identified external locations will be created subsequently prior to the actual table migration""" - ctx.external_locations.snapshot() + ctx.external_locations.snapshot(force_refresh=True) @job_task def assess_jobs(self, ctx: RuntimeContext): @@ -83,7 +83,7 @@ def assess_jobs(self, ctx: RuntimeContext): - Clusters with incompatible Spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.jobs_crawler.snapshot() + ctx.jobs_crawler.snapshot(force_refresh=True) @job_task def assess_clusters(self, ctx: RuntimeContext): @@ -96,7 +96,7 @@ def assess_clusters(self, ctx: RuntimeContext): - Clusters with incompatible spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.clusters_crawler.snapshot() + ctx.clusters_crawler.snapshot(force_refresh=True) @job_task def assess_pipelines(self, ctx: RuntimeContext): @@ -109,7 +109,7 @@ def assess_pipelines(self, ctx: RuntimeContext): Subsequently, a list of all the pipelines with matching configurations are stored in the `$inventory.pipelines` table.""" - ctx.pipelines_crawler.snapshot() + ctx.pipelines_crawler.snapshot(force_refresh=True) @job_task def assess_incompatible_submit_runs(self, ctx: RuntimeContext): @@ -122,7 +122,7 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext): It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration. Subsequently, a list of all the incompatible runs with failures are stored in the `$inventory.submit_runs` table.""" - ctx.submit_runs_crawler.snapshot() + ctx.submit_runs_crawler.snapshot(force_refresh=True) @job_task def crawl_cluster_policies(self, ctx: RuntimeContext): @@ -133,7 +133,7 @@ def crawl_cluster_policies(self, ctx: RuntimeContext): Subsequently, a list of all the policies with matching configurations are stored in the `$inventory.policies` table.""" - ctx.policies_crawler.snapshot() + ctx.policies_crawler.snapshot(force_refresh=True) @job_task(cloud="azure") def assess_azure_service_principals(self, ctx: RuntimeContext): @@ -147,7 +147,7 @@ def assess_azure_service_principals(self, ctx: RuntimeContext): Subsequently, the list of all the Azure Service Principals referred in those configurations are saved in the `$inventory.azure_service_principals` table.""" if ctx.is_azure: - ctx.azure_service_principal_crawler.snapshot() + ctx.azure_service_principal_crawler.snapshot(force_refresh=True) @job_task def assess_global_init_scripts(self, ctx: RuntimeContext): @@ -156,7 +156,7 @@ def assess_global_init_scripts(self, ctx: RuntimeContext): It looks in: - the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table.""" - ctx.global_init_scripts_crawler.snapshot() + ctx.global_init_scripts_crawler.snapshot(force_refresh=True) @job_task def workspace_listing(self, ctx: RuntimeContext): @@ -168,7 +168,7 @@ def workspace_listing(self, ctx: RuntimeContext): if not ctx.config.use_legacy_permission_migration: logger.info("Skipping workspace listing as legacy permission migration is disabled.") return - ctx.workspace_listing.snapshot() + ctx.workspace_listing.snapshot(force_refresh=True) @job_task(depends_on=[crawl_grants, workspace_listing]) def crawl_permissions(self, ctx: RuntimeContext): @@ -182,22 +182,22 @@ def crawl_permissions(self, ctx: RuntimeContext): return permission_manager = ctx.permission_manager permission_manager.reset() - permission_manager.snapshot() + permission_manager.snapshot(force_refresh=True) @job_task def crawl_groups(self, ctx: RuntimeContext): """Scans all groups for the local group migration scope""" - ctx.group_manager.snapshot() + ctx.group_manager.snapshot(force_refresh=True) @job_task def crawl_redash_dashboards(self, ctx: RuntimeContext): """Scans all Redash dashboards.""" - ctx.redash_crawler.snapshot() + ctx.redash_crawler.snapshot(force_refresh=True) @job_task def crawl_lakeview_dashboards(self, ctx: RuntimeContext): """Scans all Lakeview dashboards.""" - ctx.lakeview_crawler.snapshot() + ctx.lakeview_crawler.snapshot(force_refresh=True) @job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards]) def assess_dashboards(self, ctx: RuntimeContext): From 0ca3981d2d211cc2273174d8efd588525e43c09b Mon Sep 17 00:00:00 2001 From: pritishpai Date: Wed, 25 Jun 2025 11:02:02 -0400 Subject: [PATCH 02/18] Add documentation --- docs/ucx/docs/reference/workflows/index.mdx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/ucx/docs/reference/workflows/index.mdx b/docs/ucx/docs/reference/workflows/index.mdx index d6e1c28e39..c8f58761ec 100644 --- a/docs/ucx/docs/reference/workflows/index.mdx +++ b/docs/ucx/docs/reference/workflows/index.mdx @@ -98,9 +98,7 @@ See [this guide](/docs/reference/assessment) for more details. Proceed to the [group migration workflow](/docs/reference/workflows#group-migration-workflow) below or go back to the [migration process diagram](/docs/process/). -The UCX assessment workflow is designed to only run once, re-running will **not** update the existing results. If the -inventory and findings for a workspace need to be updated then first reinstall UCX by [uninstalling](/docs/installation#uninstall-ucx) -and [installing](/docs/installation) it again. +> **⚠️ Caution:** Re-running the UCX assessment workflow will **overwrite existing results**. For large workspaces, this process can take a significant amount of time and consume substantial resources. Only re-run if you are certain it is necessary. ## Group migration workflow From 5d567e1d0ff28e0513ef68a5ac995aff73a09823 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Wed, 25 Jun 2025 13:35:58 -0400 Subject: [PATCH 03/18] Make force_refresh of assessment parameter driven --- .../labs/ucx/assessment/workflows.py | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index b3e63a6b7d..64420ada94 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -8,7 +8,8 @@ class Assessment(Workflow): # pylint: disable=too-many-public-methods - def __init__(self): + def __init__(self, force_refresh: bool = False): + self._force_refresh = force_refresh super().__init__('assessment') @job_task @@ -18,14 +19,14 @@ def crawl_tables(self, ctx: RuntimeContext): `$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog.""" - ctx.tables_crawler.snapshot(force_refresh=True) + ctx.tables_crawler.snapshot(force_refresh=self._force_refresh) @job_task def crawl_udfs(self, ctx: RuntimeContext): """Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for issues with grants that cannot be migrated to Unit Catalog.""" - ctx.udfs_crawler.snapshot(force_refresh=True) + ctx.udfs_crawler.snapshot(force_refresh=self._force_refresh) @job_task(job_cluster="tacl") def setup_tacl(self, ctx: RuntimeContext): @@ -40,7 +41,7 @@ def crawl_grants(self, ctx: RuntimeContext): Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table ACLs enabled and available for retrieval.""" - ctx.grants_crawler.snapshot(force_refresh=True) + ctx.grants_crawler.snapshot(force_refresh=self._force_refresh) @job_task(depends_on=[crawl_tables]) def estimate_table_size_for_migration(self, ctx: RuntimeContext): @@ -48,7 +49,7 @@ def estimate_table_size_for_migration(self, ctx: RuntimeContext): "synced". These tables will have to be cloned in the migration process. Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes. The table size is a factor in deciding whether to clone these tables.""" - ctx.table_size_crawler.snapshot(force_refresh=True) + ctx.table_size_crawler.snapshot(force_refresh=self._force_refresh) @job_task def crawl_mounts(self, ctx: RuntimeContext): @@ -58,7 +59,7 @@ def crawl_mounts(self, ctx: RuntimeContext): The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently storing this information in the `$inventory.mounts` table. This is crucial for planning the migration.""" - ctx.mounts_crawler.snapshot(force_refresh=True) + ctx.mounts_crawler.snapshot(force_refresh=self._force_refresh) @job_task(depends_on=[crawl_mounts, crawl_tables]) def guess_external_locations(self, ctx: RuntimeContext): @@ -70,7 +71,7 @@ def guess_external_locations(self, ctx: RuntimeContext): - Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead - Scanning all these locations to identify folders that can act as shared path prefixes - These identified external locations will be created subsequently prior to the actual table migration""" - ctx.external_locations.snapshot(force_refresh=True) + ctx.external_locations.snapshot(force_refresh=self._force_refresh) @job_task def assess_jobs(self, ctx: RuntimeContext): @@ -83,7 +84,7 @@ def assess_jobs(self, ctx: RuntimeContext): - Clusters with incompatible Spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.jobs_crawler.snapshot(force_refresh=True) + ctx.jobs_crawler.snapshot(force_refresh=self._force_refresh) @job_task def assess_clusters(self, ctx: RuntimeContext): @@ -96,7 +97,7 @@ def assess_clusters(self, ctx: RuntimeContext): - Clusters with incompatible spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.clusters_crawler.snapshot(force_refresh=True) + ctx.clusters_crawler.snapshot(force_refresh=self._force_refresh) @job_task def assess_pipelines(self, ctx: RuntimeContext): @@ -109,7 +110,7 @@ def assess_pipelines(self, ctx: RuntimeContext): Subsequently, a list of all the pipelines with matching configurations are stored in the `$inventory.pipelines` table.""" - ctx.pipelines_crawler.snapshot(force_refresh=True) + ctx.pipelines_crawler.snapshot(force_refresh=self._force_refresh) @job_task def assess_incompatible_submit_runs(self, ctx: RuntimeContext): @@ -122,7 +123,7 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext): It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration. Subsequently, a list of all the incompatible runs with failures are stored in the `$inventory.submit_runs` table.""" - ctx.submit_runs_crawler.snapshot(force_refresh=True) + ctx.submit_runs_crawler.snapshot(force_refresh=self._force_refresh) @job_task def crawl_cluster_policies(self, ctx: RuntimeContext): @@ -133,7 +134,7 @@ def crawl_cluster_policies(self, ctx: RuntimeContext): Subsequently, a list of all the policies with matching configurations are stored in the `$inventory.policies` table.""" - ctx.policies_crawler.snapshot(force_refresh=True) + ctx.policies_crawler.snapshot(force_refresh=self._force_refresh) @job_task(cloud="azure") def assess_azure_service_principals(self, ctx: RuntimeContext): @@ -147,7 +148,7 @@ def assess_azure_service_principals(self, ctx: RuntimeContext): Subsequently, the list of all the Azure Service Principals referred in those configurations are saved in the `$inventory.azure_service_principals` table.""" if ctx.is_azure: - ctx.azure_service_principal_crawler.snapshot(force_refresh=True) + ctx.azure_service_principal_crawler.snapshot(force_refresh=self._force_refresh) @job_task def assess_global_init_scripts(self, ctx: RuntimeContext): @@ -156,7 +157,7 @@ def assess_global_init_scripts(self, ctx: RuntimeContext): It looks in: - the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table.""" - ctx.global_init_scripts_crawler.snapshot(force_refresh=True) + ctx.global_init_scripts_crawler.snapshot(force_refresh=self._force_refresh) @job_task def workspace_listing(self, ctx: RuntimeContext): @@ -168,7 +169,7 @@ def workspace_listing(self, ctx: RuntimeContext): if not ctx.config.use_legacy_permission_migration: logger.info("Skipping workspace listing as legacy permission migration is disabled.") return - ctx.workspace_listing.snapshot(force_refresh=True) + ctx.workspace_listing.snapshot(force_refresh=self._force_refresh) @job_task(depends_on=[crawl_grants, workspace_listing]) def crawl_permissions(self, ctx: RuntimeContext): @@ -182,22 +183,22 @@ def crawl_permissions(self, ctx: RuntimeContext): return permission_manager = ctx.permission_manager permission_manager.reset() - permission_manager.snapshot(force_refresh=True) + permission_manager.snapshot(force_refresh=self._force_refresh) @job_task def crawl_groups(self, ctx: RuntimeContext): """Scans all groups for the local group migration scope""" - ctx.group_manager.snapshot(force_refresh=True) + ctx.group_manager.snapshot(force_refresh=self._force_refresh) @job_task def crawl_redash_dashboards(self, ctx: RuntimeContext): """Scans all Redash dashboards.""" - ctx.redash_crawler.snapshot(force_refresh=True) + ctx.redash_crawler.snapshot(force_refresh=self._force_refresh) @job_task def crawl_lakeview_dashboards(self, ctx: RuntimeContext): """Scans all Lakeview dashboards.""" - ctx.lakeview_crawler.snapshot(force_refresh=True) + ctx.lakeview_crawler.snapshot(force_refresh=self._force_refresh) @job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards]) def assess_dashboards(self, ctx: RuntimeContext): From 56329bf996c446097f2a20f091c4ff9d93014caf Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 26 Jun 2025 17:32:16 -0400 Subject: [PATCH 04/18] Add a parameter to the assessment job for force_refresh of data --- .../labs/ucx/assessment/workflows.py | 64 ++++++++++++------- src/databricks/labs/ucx/framework/tasks.py | 10 ++- .../labs/ucx/installer/workflows.py | 1 + 3 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index 64420ada94..b216fcbd36 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -1,5 +1,7 @@ import logging +from databricks.sdk.service.jobs import JobParameterDefinition + from databricks.labs.ucx.contexts.workflow_task import RuntimeContext from databricks.labs.ucx.framework.tasks import Workflow, job_task @@ -8,25 +10,27 @@ class Assessment(Workflow): # pylint: disable=too-many-public-methods - def __init__(self, force_refresh: bool = False): - self._force_refresh = force_refresh - super().__init__('assessment') + def __init__(self): + self._force_refresh_param = JobParameterDefinition(name="force_refresh", default=False) + super().__init__('assessment', [self._force_refresh_param]) @job_task def crawl_tables(self, ctx: RuntimeContext): """Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named `$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata - stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that + stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog.""" - ctx.tables_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.tables_crawler.snapshot(force_refresh=force_refresh) @job_task def crawl_udfs(self, ctx: RuntimeContext): """Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for issues with grants that cannot be migrated to Unit Catalog.""" - ctx.udfs_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.udfs_crawler.snapshot(force_refresh=force_refresh) @job_task(job_cluster="tacl") def setup_tacl(self, ctx: RuntimeContext): @@ -41,7 +45,8 @@ def crawl_grants(self, ctx: RuntimeContext): Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table ACLs enabled and available for retrieval.""" - ctx.grants_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.grants_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_tables]) def estimate_table_size_for_migration(self, ctx: RuntimeContext): @@ -49,7 +54,8 @@ def estimate_table_size_for_migration(self, ctx: RuntimeContext): "synced". These tables will have to be cloned in the migration process. Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes. The table size is a factor in deciding whether to clone these tables.""" - ctx.table_size_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.table_size_crawler.snapshot(force_refresh=force_refresh) @job_task def crawl_mounts(self, ctx: RuntimeContext): @@ -59,7 +65,8 @@ def crawl_mounts(self, ctx: RuntimeContext): The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently storing this information in the `$inventory.mounts` table. This is crucial for planning the migration.""" - ctx.mounts_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.mounts_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_mounts, crawl_tables]) def guess_external_locations(self, ctx: RuntimeContext): @@ -71,7 +78,8 @@ def guess_external_locations(self, ctx: RuntimeContext): - Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead - Scanning all these locations to identify folders that can act as shared path prefixes - These identified external locations will be created subsequently prior to the actual table migration""" - ctx.external_locations.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.external_locations.snapshot(force_refresh=force_refresh) @job_task def assess_jobs(self, ctx: RuntimeContext): @@ -84,7 +92,8 @@ def assess_jobs(self, ctx: RuntimeContext): - Clusters with incompatible Spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.jobs_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.jobs_crawler.snapshot(force_refresh=force_refresh) @job_task def assess_clusters(self, ctx: RuntimeContext): @@ -97,7 +106,8 @@ def assess_clusters(self, ctx: RuntimeContext): - Clusters with incompatible spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.clusters_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.clusters_crawler.snapshot(force_refresh=force_refresh) @job_task def assess_pipelines(self, ctx: RuntimeContext): @@ -110,7 +120,8 @@ def assess_pipelines(self, ctx: RuntimeContext): Subsequently, a list of all the pipelines with matching configurations are stored in the `$inventory.pipelines` table.""" - ctx.pipelines_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.pipelines_crawler.snapshot(force_refresh=force_refresh) @job_task def assess_incompatible_submit_runs(self, ctx: RuntimeContext): @@ -123,7 +134,8 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext): It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration. Subsequently, a list of all the incompatible runs with failures are stored in the `$inventory.submit_runs` table.""" - ctx.submit_runs_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.submit_runs_crawler.snapshot(force_refresh=force_refresh) @job_task def crawl_cluster_policies(self, ctx: RuntimeContext): @@ -134,7 +146,8 @@ def crawl_cluster_policies(self, ctx: RuntimeContext): Subsequently, a list of all the policies with matching configurations are stored in the `$inventory.policies` table.""" - ctx.policies_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.policies_crawler.snapshot(force_refresh=force_refresh) @job_task(cloud="azure") def assess_azure_service_principals(self, ctx: RuntimeContext): @@ -148,7 +161,8 @@ def assess_azure_service_principals(self, ctx: RuntimeContext): Subsequently, the list of all the Azure Service Principals referred in those configurations are saved in the `$inventory.azure_service_principals` table.""" if ctx.is_azure: - ctx.azure_service_principal_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.azure_service_principal_crawler.snapshot(force_refresh=force_refresh) @job_task def assess_global_init_scripts(self, ctx: RuntimeContext): @@ -157,7 +171,8 @@ def assess_global_init_scripts(self, ctx: RuntimeContext): It looks in: - the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table.""" - ctx.global_init_scripts_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.global_init_scripts_crawler.snapshot(force_refresh=force_refresh) @job_task def workspace_listing(self, ctx: RuntimeContext): @@ -169,7 +184,8 @@ def workspace_listing(self, ctx: RuntimeContext): if not ctx.config.use_legacy_permission_migration: logger.info("Skipping workspace listing as legacy permission migration is disabled.") return - ctx.workspace_listing.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.workspace_listing.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_grants, workspace_listing]) def crawl_permissions(self, ctx: RuntimeContext): @@ -183,22 +199,26 @@ def crawl_permissions(self, ctx: RuntimeContext): return permission_manager = ctx.permission_manager permission_manager.reset() - permission_manager.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + permission_manager.snapshot(force_refresh=force_refresh) @job_task def crawl_groups(self, ctx: RuntimeContext): """Scans all groups for the local group migration scope""" - ctx.group_manager.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.group_manager.snapshot(force_refresh=force_refresh) @job_task def crawl_redash_dashboards(self, ctx: RuntimeContext): """Scans all Redash dashboards.""" - ctx.redash_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.redash_crawler.snapshot(force_refresh=force_refresh) @job_task def crawl_lakeview_dashboards(self, ctx: RuntimeContext): """Scans all Lakeview dashboards.""" - ctx.lakeview_crawler.snapshot(force_refresh=self._force_refresh) + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + ctx.lakeview_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards]) def assess_dashboards(self, ctx: RuntimeContext): diff --git a/src/databricks/labs/ucx/framework/tasks.py b/src/databricks/labs/ucx/framework/tasks.py index 46ddaf75a3..6840a473ea 100644 --- a/src/databricks/labs/ucx/framework/tasks.py +++ b/src/databricks/labs/ucx/framework/tasks.py @@ -6,7 +6,7 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient from databricks.sdk.core import Config -from databricks.sdk.service.jobs import CronSchedule +from databricks.sdk.service.jobs import CronSchedule, JobParameterDefinition from databricks.labs.ucx.config import WorkspaceConfig @@ -65,8 +65,9 @@ def parse_args(*argv) -> dict[str, str]: class Workflow: - def __init__(self, name: str): + def __init__(self, name: str, named_parameters: list[JobParameterDefinition] | None = None): self._name = name + self._named_parameters = named_parameters @property def name(self): @@ -77,6 +78,11 @@ def schedule(self) -> CronSchedule | None: """The default (cron) schedule for this workflow, or None if it is not scheduled.""" return None + @property + def parameters(self) -> list[JobParameterDefinition] | None: + """Named parameters for this workflow, or None if there are no parameters.""" + return self._named_parameters + def tasks(self) -> Iterable[Task]: # return __task__ from every method in this class that has this attribute for attr in dir(self): diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index 7779a25851..2a0bae1045 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -855,6 +855,7 @@ def _job_settings(self, workflow_name: str, remote_wheels: list[str]) -> dict[st "email_notifications": email_notifications, "schedule": workflow.schedule, "tasks": job_tasks, + "parameters": workflow.parameters, } def _job_task(self, task: Task, remote_wheels: list[str]) -> jobs.Task: From b68b6d400d042ff356f4860cfde9b457d84b7339 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 26 Jun 2025 17:36:00 -0400 Subject: [PATCH 05/18] Use set for membership test --- .../labs/ucx/assessment/workflows.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index b216fcbd36..7f1d97b149 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -21,7 +21,7 @@ def crawl_tables(self, ctx: RuntimeContext): `$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.tables_crawler.snapshot(force_refresh=force_refresh) @job_task @@ -29,7 +29,7 @@ def crawl_udfs(self, ctx: RuntimeContext): """Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for issues with grants that cannot be migrated to Unit Catalog.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.udfs_crawler.snapshot(force_refresh=force_refresh) @job_task(job_cluster="tacl") @@ -45,7 +45,7 @@ def crawl_grants(self, ctx: RuntimeContext): Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table ACLs enabled and available for retrieval.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.grants_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_tables]) @@ -54,7 +54,7 @@ def estimate_table_size_for_migration(self, ctx: RuntimeContext): "synced". These tables will have to be cloned in the migration process. Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes. The table size is a factor in deciding whether to clone these tables.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.table_size_crawler.snapshot(force_refresh=force_refresh) @job_task @@ -65,7 +65,7 @@ def crawl_mounts(self, ctx: RuntimeContext): The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently storing this information in the `$inventory.mounts` table. This is crucial for planning the migration.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.mounts_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_mounts, crawl_tables]) @@ -78,7 +78,7 @@ def guess_external_locations(self, ctx: RuntimeContext): - Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead - Scanning all these locations to identify folders that can act as shared path prefixes - These identified external locations will be created subsequently prior to the actual table migration""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.external_locations.snapshot(force_refresh=force_refresh) @job_task @@ -92,7 +92,7 @@ def assess_jobs(self, ctx: RuntimeContext): - Clusters with incompatible Spark config tags - Clusters referencing DBFS locations in one or more config options """ - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.jobs_crawler.snapshot(force_refresh=force_refresh) @job_task @@ -106,7 +106,7 @@ def assess_clusters(self, ctx: RuntimeContext): - Clusters with incompatible spark config tags - Clusters referencing DBFS locations in one or more config options """ - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.clusters_crawler.snapshot(force_refresh=force_refresh) @job_task @@ -120,7 +120,7 @@ def assess_pipelines(self, ctx: RuntimeContext): Subsequently, a list of all the pipelines with matching configurations are stored in the `$inventory.pipelines` table.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.pipelines_crawler.snapshot(force_refresh=force_refresh) @job_task @@ -134,7 +134,7 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext): It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration. Subsequently, a list of all the incompatible runs with failures are stored in the `$inventory.submit_runs` table.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.submit_runs_crawler.snapshot(force_refresh=force_refresh) @job_task @@ -146,7 +146,7 @@ def crawl_cluster_policies(self, ctx: RuntimeContext): Subsequently, a list of all the policies with matching configurations are stored in the `$inventory.policies` table.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.policies_crawler.snapshot(force_refresh=force_refresh) @job_task(cloud="azure") @@ -161,7 +161,7 @@ def assess_azure_service_principals(self, ctx: RuntimeContext): Subsequently, the list of all the Azure Service Principals referred in those configurations are saved in the `$inventory.azure_service_principals` table.""" if ctx.is_azure: - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.azure_service_principal_crawler.snapshot(force_refresh=force_refresh) @job_task @@ -171,7 +171,7 @@ def assess_global_init_scripts(self, ctx: RuntimeContext): It looks in: - the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.global_init_scripts_crawler.snapshot(force_refresh=force_refresh) @job_task @@ -184,7 +184,7 @@ def workspace_listing(self, ctx: RuntimeContext): if not ctx.config.use_legacy_permission_migration: logger.info("Skipping workspace listing as legacy permission migration is disabled.") return - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.workspace_listing.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_grants, workspace_listing]) @@ -199,25 +199,25 @@ def crawl_permissions(self, ctx: RuntimeContext): return permission_manager = ctx.permission_manager permission_manager.reset() - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} permission_manager.snapshot(force_refresh=force_refresh) @job_task def crawl_groups(self, ctx: RuntimeContext): """Scans all groups for the local group migration scope""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.group_manager.snapshot(force_refresh=force_refresh) @job_task def crawl_redash_dashboards(self, ctx: RuntimeContext): """Scans all Redash dashboards.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.redash_crawler.snapshot(force_refresh=force_refresh) @job_task def crawl_lakeview_dashboards(self, ctx: RuntimeContext): """Scans all Lakeview dashboards.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"] + force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} ctx.lakeview_crawler.snapshot(force_refresh=force_refresh) @job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards]) From 03dacfbfde332584e17ef8ad6da35417d18326d9 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Mon, 30 Jun 2025 15:38:41 -0400 Subject: [PATCH 06/18] Fetching permissions returns groups and users that have access. --- tests/integration/framework/test_owners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/framework/test_owners.py b/tests/integration/framework/test_owners.py index c8fa0b6fdf..d624d13467 100644 --- a/tests/integration/framework/test_owners.py +++ b/tests/integration/framework/test_owners.py @@ -70,7 +70,7 @@ def test_notebook_owner(make_notebook, make_notebook_permissions, make_group, ws name = notebook_ownership.owner_of(notebook) my_user = ws.current_user.me() - assert name == my_user.user_name + assert name in [my_user.user_name, new_group.display_name] def test_file_owner(make_workspace_file, ws): From f2a5e9ea211fee410c13e90833d303c3497c68e3 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Tue, 1 Jul 2025 16:51:44 -0400 Subject: [PATCH 07/18] Update readme to reflect parameter change --- docs/ucx/docs/reference/workflows/index.mdx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/ucx/docs/reference/workflows/index.mdx b/docs/ucx/docs/reference/workflows/index.mdx index c8f58761ec..fb5c0a420c 100644 --- a/docs/ucx/docs/reference/workflows/index.mdx +++ b/docs/ucx/docs/reference/workflows/index.mdx @@ -98,7 +98,8 @@ See [this guide](/docs/reference/assessment) for more details. Proceed to the [group migration workflow](/docs/reference/workflows#group-migration-workflow) below or go back to the [migration process diagram](/docs/process/). -> **⚠️ Caution:** Re-running the UCX assessment workflow will **overwrite existing results**. For large workspaces, this process can take a significant amount of time and consume substantial resources. Only re-run if you are certain it is necessary. +> ⚠️ Caution: To fully refresh the UCX assessment workflow and overwrite existing results, set the force_refresh parameter to true (or True). + For large workspaces, this process can be time-consuming and resource-intensive. Only use this option if a complete reassessment is absolutely necessary. ## Group migration workflow From be17d8593894a3ce6de3916ef20be62e4dc57ff1 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 3 Jul 2025 10:31:30 -0400 Subject: [PATCH 08/18] Add force_refresh flag to ensure-assessment-run for full refresh of assessment --- labs.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/labs.yml b/labs.yml index 339f3f6fed..97f805aced 100644 --- a/labs.yml +++ b/labs.yml @@ -87,6 +87,9 @@ commands: - name: run-as-collection description: (Optional) Whether to check (and run if necessary) the assessment for the collection of workspaces with ucx installed. Default is false. + - name: force_refresh + description: (Optional) Force a full refresh of the assessment job, even if it was run recently. + Default is false. - name: update-migration-progress description: trigger the `migration-progress-experimental` job to refresh the inventory that tracks the workspace From b53a1166a3df6555effda98e1c30b8f3c20c1500 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 3 Jul 2025 10:32:02 -0400 Subject: [PATCH 09/18] Fetch flag value of force_refresh for assessment rerun --- src/databricks/labs/ucx/cli.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 0240d426a8..8847e94fce 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -219,15 +219,16 @@ def validate_external_locations( @ucx.command -def ensure_assessment_run(w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None): +def ensure_assessment_run(w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None, **named_parameters): """ensure the assessment job was run on a workspace""" workspace_contexts = _get_workspace_contexts(w, a, run_as_collection) + force_refresh = named_parameters.get("force_refresh", "false").lower() == "true" for ctx in workspace_contexts: workspace_id = ctx.workspace_client.get_workspace_id() logger.info(f"Checking assessment workflow in workspace: {workspace_id}") deployed_workflows = ctx.deployed_workflows # Note: will block if the workflow is already underway but not completed. - if deployed_workflows.validate_step("assessment"): + if deployed_workflows.validate_step("assessment") and not force_refresh: logger.info(f"The assessment workflow has successfully completed in workspace: {workspace_id}") else: logger.info(f"Starting assessment workflow in workspace: {workspace_id}") From 118b2f05e2f6b70ca104b9ab2f6bc22c1183117f Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 3 Jul 2025 16:16:03 -0400 Subject: [PATCH 10/18] Add integration test, fetch parameter from current job run --- .../labs/ucx/assessment/workflows.py | 21 +++++++++++++++---- .../labs/ucx/installer/workflows.py | 5 +++-- .../integration/assessment/test_workflows.py | 9 ++++++++ 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index 7f1d97b149..5b8ea9c9ee 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -1,4 +1,5 @@ import logging +from functools import cached_property from databricks.sdk.service.jobs import JobParameterDefinition @@ -11,8 +12,21 @@ class Assessment(Workflow): # pylint: disable=too-many-public-methods def __init__(self): - self._force_refresh_param = JobParameterDefinition(name="force_refresh", default=False) - super().__init__('assessment', [self._force_refresh_param]) + super().__init__('assessment', [JobParameterDefinition(name="force_refresh", default=False)]) + + @staticmethod + def _get_force_refresh(ctx: RuntimeContext) -> bool: + """Extracts the force_refresh parameter from the job run parameters.""" + force_refresh = False + job_id = ctx.install_state.jobs["assessment"] + job_runs = [job_run for job_run in ctx.workspace_client.jobs.list_runs(active_only=True, job_id=job_id)] + job_parameters = job_runs[0].job_parameters if job_runs else [] + for job_parameter in job_parameters: + if job_parameter.name == "force_refresh" and job_parameter.value is not None: + force_refresh = job_parameter.value.lower() in {"true", "1"} + logger.info("Found force_refresh parameter in job run: %s", force_refresh) + break + return force_refresh @job_task def crawl_tables(self, ctx: RuntimeContext): @@ -21,8 +35,7 @@ def crawl_tables(self, ctx: RuntimeContext): `$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog.""" - force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in {"true", "1"} - ctx.tables_crawler.snapshot(force_refresh=force_refresh) + ctx.tables_crawler.snapshot(force_refresh=self._get_force_refresh(ctx)) @job_task def crawl_udfs(self, ctx: RuntimeContext): diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index 2a0bae1045..25d4e38c75 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -249,13 +249,14 @@ def __init__(self, ws: WorkspaceClient, install_state: InstallState): self._ws = ws self._install_state = install_state - def run_workflow(self, step: str, skip_job_wait: bool = False, max_wait: timedelta = timedelta(minutes=20)) -> int: + def run_workflow(self, step: str, skip_job_wait: bool = False, max_wait: timedelta = timedelta(minutes=20), + named_parameters: dict[str, str] | None = None ) -> int: # this dunder variable is hiding this method from tracebacks, making it cleaner # for the user to see the actual error without too much noise. __tracebackhide__ = True # pylint: disable=unused-variable job_id = int(self._install_state.jobs[step]) logger.debug(f"starting {step} job: {self._ws.config.host}#job/{job_id}") - job_initial_run = self._ws.jobs.run_now(job_id) + job_initial_run = self._ws.jobs.run_now(job_id, job_parameters=named_parameters) run_id = job_initial_run.run_id if not run_id: raise NotFound(f"job run not found for {step}") diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index d9d0e53e91..9647cf0eb0 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -49,3 +49,12 @@ def test_running_real_assessment_job( query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems" workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"] assert not workflow_problems_without_path + + post_assessment_table = installation_ctx.make_table(schema_name=source_schema.name, ctas="SELECT 1 AS one") + installation_ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=False, named_parameters={"force_refresh": "true"}) + assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" + expected_tables = {managed_table.name, external_table.name, tmp_table.name, view.name, non_delta.name, post_assessment_table.name} + + query = f"SELECT * FROM {installation_ctx.inventory_database}.tables" + tables_after_assessment_rerun = [table.name for table in sql_backend.fetch(query)] + assert tables_after_assessment_rerun == expected_tables From cd9b811e2960684ed4e65dd8ab034c4d10f1f0e4 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Thu, 3 Jul 2025 16:21:37 -0400 Subject: [PATCH 11/18] Update cli command --- src/databricks/labs/ucx/cli.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 8847e94fce..c1fad2ec64 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -230,11 +230,12 @@ def ensure_assessment_run(w: WorkspaceClient, run_as_collection: bool = False, a # Note: will block if the workflow is already underway but not completed. if deployed_workflows.validate_step("assessment") and not force_refresh: logger.info(f"The assessment workflow has successfully completed in workspace: {workspace_id}") + elif force_refresh: + logger.info(f"Re-running assessment workflow in workspace: {workspace_id}") + deployed_workflows.run_workflow("assessment", skip_job_wait=run_as_collection, named_parameters={"force_refresh": "true"}) else: logger.info(f"Starting assessment workflow in workspace: {workspace_id}") # If running for a collection, don't wait for each assessment job to finish as that will take a long time. - deployed_workflows.run_workflow("assessment", skip_job_wait=run_as_collection) - @ucx.command def update_migration_progress( From 36b22554e2513da0cf9e7f45243d794f4d5f8586 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Fri, 4 Jul 2025 10:30:44 -0400 Subject: [PATCH 12/18] Fix fmt --- src/databricks/labs/ucx/assessment/workflows.py | 7 ++++--- src/databricks/labs/ucx/cli.py | 9 +++++++-- src/databricks/labs/ucx/installer/workflows.py | 9 +++++++-- tests/integration/assessment/test_workflows.py | 13 +++++++++++-- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index 5b8ea9c9ee..3abe576756 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -1,5 +1,4 @@ import logging -from functools import cached_property from databricks.sdk.service.jobs import JobParameterDefinition @@ -19,12 +18,14 @@ def _get_force_refresh(ctx: RuntimeContext) -> bool: """Extracts the force_refresh parameter from the job run parameters.""" force_refresh = False job_id = ctx.install_state.jobs["assessment"] - job_runs = [job_run for job_run in ctx.workspace_client.jobs.list_runs(active_only=True, job_id=job_id)] + job_runs = list(ctx.workspace_client.jobs.list_runs(active_only=True, job_id=job_id)) job_parameters = job_runs[0].job_parameters if job_runs else [] + if not job_parameters: + return False for job_parameter in job_parameters: if job_parameter.name == "force_refresh" and job_parameter.value is not None: force_refresh = job_parameter.value.lower() in {"true", "1"} - logger.info("Found force_refresh parameter in job run: %s", force_refresh) + logger.info(f"Found force_refresh parameter in job run: {force_refresh}") break return force_refresh diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index c1fad2ec64..5a28f9693b 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -219,7 +219,9 @@ def validate_external_locations( @ucx.command -def ensure_assessment_run(w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None, **named_parameters): +def ensure_assessment_run( + w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None, **named_parameters +): """ensure the assessment job was run on a workspace""" workspace_contexts = _get_workspace_contexts(w, a, run_as_collection) force_refresh = named_parameters.get("force_refresh", "false").lower() == "true" @@ -232,11 +234,14 @@ def ensure_assessment_run(w: WorkspaceClient, run_as_collection: bool = False, a logger.info(f"The assessment workflow has successfully completed in workspace: {workspace_id}") elif force_refresh: logger.info(f"Re-running assessment workflow in workspace: {workspace_id}") - deployed_workflows.run_workflow("assessment", skip_job_wait=run_as_collection, named_parameters={"force_refresh": "true"}) + deployed_workflows.run_workflow( + "assessment", skip_job_wait=run_as_collection, named_parameters={"force_refresh": "true"} + ) else: logger.info(f"Starting assessment workflow in workspace: {workspace_id}") # If running for a collection, don't wait for each assessment job to finish as that will take a long time. + @ucx.command def update_migration_progress( w: WorkspaceClient, diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index 25d4e38c75..e1e6fed26c 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -249,8 +249,13 @@ def __init__(self, ws: WorkspaceClient, install_state: InstallState): self._ws = ws self._install_state = install_state - def run_workflow(self, step: str, skip_job_wait: bool = False, max_wait: timedelta = timedelta(minutes=20), - named_parameters: dict[str, str] | None = None ) -> int: + def run_workflow( + self, + step: str, + skip_job_wait: bool = False, + max_wait: timedelta = timedelta(minutes=20), + named_parameters: dict[str, str] | None = None, + ) -> int: # this dunder variable is hiding this method from tracebacks, making it cleaner # for the user to see the actual error without too much noise. __tracebackhide__ = True # pylint: disable=unused-variable diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index 9647cf0eb0..e960787ad1 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -51,9 +51,18 @@ def test_running_real_assessment_job( assert not workflow_problems_without_path post_assessment_table = installation_ctx.make_table(schema_name=source_schema.name, ctas="SELECT 1 AS one") - installation_ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=False, named_parameters={"force_refresh": "true"}) + installation_ctx.deployed_workflows.run_workflow( + workflow, skip_job_wait=False, named_parameters={"force_refresh": "true"} + ) assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" - expected_tables = {managed_table.name, external_table.name, tmp_table.name, view.name, non_delta.name, post_assessment_table.name} + expected_tables = { + managed_table.name, + external_table.name, + tmp_table.name, + view.name, + non_delta.name, + post_assessment_table.name, + } query = f"SELECT * FROM {installation_ctx.inventory_database}.tables" tables_after_assessment_rerun = [table.name for table in sql_backend.fetch(query)] From a0e03258a59911d851561f78061f86454b0459a4 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Fri, 4 Jul 2025 12:11:43 -0400 Subject: [PATCH 13/18] Add job_parameters to the test run_now to imitate adding it to run_workflow --- tests/unit/install/test_install.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/unit/install/test_install.py b/tests/unit/install/test_install.py index 343f0e93ed..9c9c70ee11 100644 --- a/tests/unit/install/test_install.py +++ b/tests/unit/install/test_install.py @@ -180,8 +180,9 @@ def test_writeable_dbfs(ws, tmp_path, mock_installation) -> None: def test_run_workflow_creates_proper_failure(ws, mocker, mock_installation_with_jobs): - def run_now(job_id): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -215,8 +216,9 @@ def result(): def test_run_workflow_run_id_not_found(ws, mocker, mock_installation_with_jobs): - def run_now(job_id): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -246,8 +248,9 @@ def result(): def test_run_workflow_creates_failure_from_mapping(ws, mocker, mock_installation, mock_installation_with_jobs): - def run_now(job_id): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -281,8 +284,9 @@ def result(): def test_run_workflow_creates_failure_many_error(ws, mocker, mock_installation_with_jobs): - def run_now(job_id): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) From a14165d2683f763709865ef4b59d44d288c3482a Mon Sep 17 00:00:00 2001 From: pritishpai Date: Sat, 5 Jul 2025 11:14:03 -0400 Subject: [PATCH 14/18] Indicate job_parameters argument is intentionally unused --- tests/unit/install/test_install.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/unit/install/test_install.py b/tests/unit/install/test_install.py index 9c9c70ee11..3c69fdd7be 100644 --- a/tests/unit/install/test_install.py +++ b/tests/unit/install/test_install.py @@ -180,9 +180,8 @@ def test_writeable_dbfs(ws, tmp_path, mock_installation) -> None: def test_run_workflow_creates_proper_failure(ws, mocker, mock_installation_with_jobs): - def run_now(job_id, job_parameters: dict[str, Any] | None = None): + def run_now(job_id, _job_parameters: dict[str, Any] | None = None): assert job_id == 123 - assert job_parameters is None def result(): raise OperationFailed(...) @@ -216,9 +215,8 @@ def result(): def test_run_workflow_run_id_not_found(ws, mocker, mock_installation_with_jobs): - def run_now(job_id, job_parameters: dict[str, Any] | None = None): + def run_now(job_id, _job_parameters: dict[str, Any] | None = None): assert job_id == 123 - assert job_parameters is None def result(): raise OperationFailed(...) @@ -248,9 +246,8 @@ def result(): def test_run_workflow_creates_failure_from_mapping(ws, mocker, mock_installation, mock_installation_with_jobs): - def run_now(job_id, job_parameters: dict[str, Any] | None = None): + def run_now(job_id, _job_parameters: dict[str, Any] | None = None): assert job_id == 123 - assert job_parameters is None def result(): raise OperationFailed(...) @@ -284,9 +281,8 @@ def result(): def test_run_workflow_creates_failure_many_error(ws, mocker, mock_installation_with_jobs): - def run_now(job_id, job_parameters: dict[str, Any] | None = None): + def run_now(job_id, _job_parameters: dict[str, Any] | None = None): assert job_id == 123 - assert job_parameters is None def result(): raise OperationFailed(...) From 4d6f377db485426dc65e9d645938bdf3d090c846 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Mon, 7 Jul 2025 10:53:24 -0400 Subject: [PATCH 15/18] Fix error due to list to set comparison --- tests/integration/assessment/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index e960787ad1..8cd009525f 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -65,5 +65,5 @@ def test_running_real_assessment_job( } query = f"SELECT * FROM {installation_ctx.inventory_database}.tables" - tables_after_assessment_rerun = [table.name for table in sql_backend.fetch(query)] + tables_after_assessment_rerun = set([table.name for table in sql_backend.fetch(query)]) assert tables_after_assessment_rerun == expected_tables From 2f1db1ebbb3d7b3dd1da29c97a96b350385f18e9 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Mon, 7 Jul 2025 11:01:46 -0400 Subject: [PATCH 16/18] Use set comprehension directly --- tests/integration/assessment/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index 8cd009525f..a6448a30c0 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -65,5 +65,5 @@ def test_running_real_assessment_job( } query = f"SELECT * FROM {installation_ctx.inventory_database}.tables" - tables_after_assessment_rerun = set([table.name for table in sql_backend.fetch(query)]) + tables_after_assessment_rerun = {table.name for table in sql_backend.fetch(query)} assert tables_after_assessment_rerun == expected_tables From 6c5d996f3940961a27d124981fe8fc9dccb82693 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Mon, 7 Jul 2025 11:25:54 -0400 Subject: [PATCH 17/18] Fix unit tests and cli command --- src/databricks/labs/ucx/cli.py | 1 + tests/unit/install/test_install.py | 12 ++++++++---- tests/unit/installer/test_workflows.py | 8 ++++---- tests/unit/test_cli.py | 8 ++++---- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 5a28f9693b..442a4f552a 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -239,6 +239,7 @@ def ensure_assessment_run( ) else: logger.info(f"Starting assessment workflow in workspace: {workspace_id}") + deployed_workflows.run_workflow("assessment", skip_job_wait=run_as_collection) # If running for a collection, don't wait for each assessment job to finish as that will take a long time. diff --git a/tests/unit/install/test_install.py b/tests/unit/install/test_install.py index 3c69fdd7be..9c9c70ee11 100644 --- a/tests/unit/install/test_install.py +++ b/tests/unit/install/test_install.py @@ -180,8 +180,9 @@ def test_writeable_dbfs(ws, tmp_path, mock_installation) -> None: def test_run_workflow_creates_proper_failure(ws, mocker, mock_installation_with_jobs): - def run_now(job_id, _job_parameters: dict[str, Any] | None = None): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -215,8 +216,9 @@ def result(): def test_run_workflow_run_id_not_found(ws, mocker, mock_installation_with_jobs): - def run_now(job_id, _job_parameters: dict[str, Any] | None = None): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -246,8 +248,9 @@ def result(): def test_run_workflow_creates_failure_from_mapping(ws, mocker, mock_installation, mock_installation_with_jobs): - def run_now(job_id, _job_parameters: dict[str, Any] | None = None): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) @@ -281,8 +284,9 @@ def result(): def test_run_workflow_creates_failure_many_error(ws, mocker, mock_installation_with_jobs): - def run_now(job_id, _job_parameters: dict[str, Any] | None = None): + def run_now(job_id, job_parameters: dict[str, Any] | None = None): assert job_id == 123 + assert job_parameters is None def result(): raise OperationFailed(...) diff --git a/tests/unit/installer/test_workflows.py b/tests/unit/installer/test_workflows.py index 3cd7a196c4..66bc3edee7 100644 --- a/tests/unit/installer/test_workflows.py +++ b/tests/unit/installer/test_workflows.py @@ -87,7 +87,7 @@ def test_run_workflow(mock_installation) -> None: run_id = workflows.run_workflow("test") assert run_id == 456 - ws.jobs.run_now.assert_called_once_with(123) + ws.jobs.run_now.assert_called_once_with(123, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once_with(run_id=456, timeout=timedelta(minutes=20)) @@ -101,7 +101,7 @@ def test_run_workflow_skip_job_wait(mock_installation) -> None: run_id = workflows.run_workflow("test", skip_job_wait=True) assert run_id == 456 - ws.jobs.run_now.assert_called_once_with(123) + ws.jobs.run_now.assert_called_once_with(123, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.assert_not_called() @@ -121,7 +121,7 @@ def test_run_workflow_operation_failed(mock_installation) -> None: with pytest.raises(NotFound, match="a_table"): _ = workflows.run_workflow("test") - ws.jobs.run_now.assert_called_once_with(123) + ws.jobs.run_now.assert_called_once_with(123, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once_with(run_id=456, timeout=timedelta(minutes=20)) ws.jobs.get_run.assert_called_once_with(456) ws.workspace.list.assert_called_once_with("~/mock/logs/test") @@ -139,6 +139,6 @@ def test_run_workflow_timeout(mock_installation) -> None: with pytest.raises(TimeoutError): _ = workflows.run_workflow("test", max_wait=timedelta(minutes=2)) - ws.jobs.run_now.assert_called_once_with(123) + ws.jobs.run_now.assert_called_once_with(123, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once_with(run_id=456, timeout=timedelta(minutes=2)) ws.workspace.list.assert_called_once_with("~/mock/logs/test") diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index c35c20ef21..6699d9bdad 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -406,7 +406,7 @@ def test_ensure_assessment_run_collection(workspace_clients, acc_client): ensure_assessment_run(workspace_clients[0], run_as_collection=True, a=acc_client) for workspace_client in workspace_clients: - workspace_client.jobs.run_now.assert_called_with(123) + workspace_client.jobs.run_now.assert_called_with(123, job_parameters=None) def test_repair_run(ws): @@ -1035,7 +1035,7 @@ def test_migrate_tables_calls_migrate_table_job_run_now( migrate_tables(workspace_clients[0], MockPrompts({}), run_as_collection=run_as_collection, a=acc_client) for workspace_client in workspace_clients: - workspace_client.jobs.run_now.assert_called_with(456) + workspace_client.jobs.run_now.assert_called_with(456, job_parameters=None) workspace_client.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once() @@ -1079,7 +1079,7 @@ def test_migrate_tables_calls_external_hiveserde_tables_job_run_now(ws) -> None: migrate_tables(ws, prompts, ctx=ctx) - ws.jobs.run_now.assert_called_with(789) + ws.jobs.run_now.assert_called_with(789, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.call_count = 2 @@ -1114,7 +1114,7 @@ def test_migrate_tables_calls_external_tables_ctas_job_run_now(ws) -> None: migrate_tables(ws, prompts, ctx=ctx) - ws.jobs.run_now.assert_called_with(987) + ws.jobs.run_now.assert_called_with(987, job_parameters=None) ws.jobs.wait_get_run_job_terminated_or_skipped.call_count = 2 From 986e0da7cbec53c5b96f789804ecededff2a7500 Mon Sep 17 00:00:00 2001 From: pritishpai Date: Mon, 7 Jul 2025 12:07:50 -0400 Subject: [PATCH 18/18] Update cli and docs --- docs/ucx/docs/reference/commands/index.mdx | 3 ++- docs/ucx/docs/reference/workflows/index.mdx | 1 + labs.yml | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/ucx/docs/reference/commands/index.mdx b/docs/ucx/docs/reference/commands/index.mdx index 96a67fe623..c60119ff99 100644 --- a/docs/ucx/docs/reference/commands/index.mdx +++ b/docs/ucx/docs/reference/commands/index.mdx @@ -731,13 +731,14 @@ are displayed. To display `DEBUG` logs, use the `--debug` flag. ### `ensure-assessment-run` ```commandline -databricks labs ucx ensure-assessment-run +databricks labs ucx ensure-assessment-run [--force-refresh true/false] ``` This command ensures that the [assessment workflow](/docs/reference/workflows#assessment-workflow) was run on a workspace. This command will block until job finishes. Failed workflows can be fixed with the [`repair-run` command](/docs/reference/commands#repair-run). Workflows and their status can be listed with the [`workflows` command](/docs/reference/commands#workflows). +The `--force-refresh` flag can be used to force the assessment workflow to run again overwriting the previous assessment results, even if it was already run before. ### `update-migration-progress` diff --git a/docs/ucx/docs/reference/workflows/index.mdx b/docs/ucx/docs/reference/workflows/index.mdx index fb5c0a420c..be5110b318 100644 --- a/docs/ucx/docs/reference/workflows/index.mdx +++ b/docs/ucx/docs/reference/workflows/index.mdx @@ -100,6 +100,7 @@ Proceed to the [group migration workflow](/docs/reference/workflows#group-migrat > ⚠️ Caution: To fully refresh the UCX assessment workflow and overwrite existing results, set the force_refresh parameter to true (or True). For large workspaces, this process can be time-consuming and resource-intensive. Only use this option if a complete reassessment is absolutely necessary. + Alternatively, you can re run assessment using the [command](/docs/reference/commands#ensure-assessment-run) with the `force-refresh` parameter set to `true` to overwrite existing results. ## Group migration workflow diff --git a/labs.yml b/labs.yml index 97f805aced..69dc613f83 100644 --- a/labs.yml +++ b/labs.yml @@ -87,7 +87,7 @@ commands: - name: run-as-collection description: (Optional) Whether to check (and run if necessary) the assessment for the collection of workspaces with ucx installed. Default is false. - - name: force_refresh + - name: force-refresh description: (Optional) Force a full refresh of the assessment job, even if it was run recently. Default is false.