Skip to content

Commit a8d1fa2

Browse files
committed
Add a parameter to the assessment job for force_refresh of data
1 parent e7fb7ed commit a8d1fa2

File tree

3 files changed

+51
-24
lines changed

3 files changed

+51
-24
lines changed

src/databricks/labs/ucx/assessment/workflows.py

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import logging
22

3+
from databricks.sdk.service.jobs import JobParameterDefinition
4+
35
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
46
from databricks.labs.ucx.framework.tasks import Workflow, job_task
57

@@ -8,25 +10,27 @@
810

911

1012
class Assessment(Workflow): # pylint: disable=too-many-public-methods
11-
def __init__(self, force_refresh: bool = False):
12-
self._force_refresh = force_refresh
13-
super().__init__('assessment')
13+
def __init__(self):
14+
self._force_refresh_param = JobParameterDefinition(name="force_refresh", default=False)
15+
super().__init__('assessment', [self._force_refresh_param])
1416

1517
@job_task
1618
def crawl_tables(self, ctx: RuntimeContext):
1719
"""Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such
1820
as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named
1921
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
20-
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
22+
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
2123
cannot easily be migrated to Unity Catalog."""
22-
ctx.tables_crawler.snapshot(force_refresh=self._force_refresh)
24+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
25+
ctx.tables_crawler.snapshot(force_refresh=force_refresh)
2326

2427
@job_task
2528
def crawl_udfs(self, ctx: RuntimeContext):
2629
"""Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the
2730
table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for
2831
issues with grants that cannot be migrated to Unit Catalog."""
29-
ctx.udfs_crawler.snapshot(force_refresh=self._force_refresh)
32+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
33+
ctx.udfs_crawler.snapshot(force_refresh=force_refresh)
3034

3135
@job_task(job_cluster="tacl")
3236
def setup_tacl(self, ctx: RuntimeContext):
@@ -41,15 +45,17 @@ def crawl_grants(self, ctx: RuntimeContext):
4145
4246
Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table
4347
ACLs enabled and available for retrieval."""
44-
ctx.grants_crawler.snapshot(force_refresh=self._force_refresh)
48+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
49+
ctx.grants_crawler.snapshot(force_refresh=force_refresh)
4550

4651
@job_task(depends_on=[crawl_tables])
4752
def estimate_table_size_for_migration(self, ctx: RuntimeContext):
4853
"""Scans the previously created Delta table named `$inventory_database.tables` and locate tables that cannot be
4954
"synced". These tables will have to be cloned in the migration process.
5055
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
5156
The table size is a factor in deciding whether to clone these tables."""
52-
ctx.table_size_crawler.snapshot(force_refresh=self._force_refresh)
57+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
58+
ctx.table_size_crawler.snapshot(force_refresh=force_refresh)
5359

5460
@job_task
5561
def crawl_mounts(self, ctx: RuntimeContext):
@@ -59,7 +65,8 @@ def crawl_mounts(self, ctx: RuntimeContext):
5965
6066
The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently
6167
storing this information in the `$inventory.mounts` table. This is crucial for planning the migration."""
62-
ctx.mounts_crawler.snapshot(force_refresh=self._force_refresh)
68+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
69+
ctx.mounts_crawler.snapshot(force_refresh=force_refresh)
6370

6471
@job_task(depends_on=[crawl_mounts, crawl_tables])
6572
def guess_external_locations(self, ctx: RuntimeContext):
@@ -71,7 +78,8 @@ def guess_external_locations(self, ctx: RuntimeContext):
7178
- Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead
7279
- Scanning all these locations to identify folders that can act as shared path prefixes
7380
- These identified external locations will be created subsequently prior to the actual table migration"""
74-
ctx.external_locations.snapshot(force_refresh=self._force_refresh)
81+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
82+
ctx.external_locations.snapshot(force_refresh=force_refresh)
7583

7684
@job_task
7785
def assess_jobs(self, ctx: RuntimeContext):
@@ -84,7 +92,8 @@ def assess_jobs(self, ctx: RuntimeContext):
8492
- Clusters with incompatible Spark config tags
8593
- Clusters referencing DBFS locations in one or more config options
8694
"""
87-
ctx.jobs_crawler.snapshot(force_refresh=self._force_refresh)
95+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
96+
ctx.jobs_crawler.snapshot(force_refresh=force_refresh)
8897

8998
@job_task
9099
def assess_clusters(self, ctx: RuntimeContext):
@@ -97,7 +106,8 @@ def assess_clusters(self, ctx: RuntimeContext):
97106
- Clusters with incompatible spark config tags
98107
- Clusters referencing DBFS locations in one or more config options
99108
"""
100-
ctx.clusters_crawler.snapshot(force_refresh=self._force_refresh)
109+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
110+
ctx.clusters_crawler.snapshot(force_refresh=force_refresh)
101111

102112
@job_task
103113
def assess_pipelines(self, ctx: RuntimeContext):
@@ -110,7 +120,8 @@ def assess_pipelines(self, ctx: RuntimeContext):
110120
111121
Subsequently, a list of all the pipelines with matching configurations are stored in the
112122
`$inventory.pipelines` table."""
113-
ctx.pipelines_crawler.snapshot(force_refresh=self._force_refresh)
123+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
124+
ctx.pipelines_crawler.snapshot(force_refresh=force_refresh)
114125

115126
@job_task
116127
def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
@@ -123,7 +134,8 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
123134
It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration.
124135
Subsequently, a list of all the incompatible runs with failures are stored in the
125136
`$inventory.submit_runs` table."""
126-
ctx.submit_runs_crawler.snapshot(force_refresh=self._force_refresh)
137+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
138+
ctx.submit_runs_crawler.snapshot(force_refresh=force_refresh)
127139

128140
@job_task
129141
def crawl_cluster_policies(self, ctx: RuntimeContext):
@@ -134,7 +146,8 @@ def crawl_cluster_policies(self, ctx: RuntimeContext):
134146
135147
Subsequently, a list of all the policies with matching configurations are stored in the
136148
`$inventory.policies` table."""
137-
ctx.policies_crawler.snapshot(force_refresh=self._force_refresh)
149+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
150+
ctx.policies_crawler.snapshot(force_refresh=force_refresh)
138151

139152
@job_task(cloud="azure")
140153
def assess_azure_service_principals(self, ctx: RuntimeContext):
@@ -148,7 +161,8 @@ def assess_azure_service_principals(self, ctx: RuntimeContext):
148161
Subsequently, the list of all the Azure Service Principals referred in those configurations are saved
149162
in the `$inventory.azure_service_principals` table."""
150163
if ctx.is_azure:
151-
ctx.azure_service_principal_crawler.snapshot(force_refresh=self._force_refresh)
164+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
165+
ctx.azure_service_principal_crawler.snapshot(force_refresh=force_refresh)
152166

153167
@job_task
154168
def assess_global_init_scripts(self, ctx: RuntimeContext):
@@ -157,7 +171,8 @@ def assess_global_init_scripts(self, ctx: RuntimeContext):
157171
158172
It looks in:
159173
- the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table."""
160-
ctx.global_init_scripts_crawler.snapshot(force_refresh=self._force_refresh)
174+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
175+
ctx.global_init_scripts_crawler.snapshot(force_refresh=force_refresh)
161176

162177
@job_task
163178
def workspace_listing(self, ctx: RuntimeContext):
@@ -169,7 +184,8 @@ def workspace_listing(self, ctx: RuntimeContext):
169184
if not ctx.config.use_legacy_permission_migration:
170185
logger.info("Skipping workspace listing as legacy permission migration is disabled.")
171186
return
172-
ctx.workspace_listing.snapshot(force_refresh=self._force_refresh)
187+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
188+
ctx.workspace_listing.snapshot(force_refresh=force_refresh)
173189

174190
@job_task(depends_on=[crawl_grants, workspace_listing])
175191
def crawl_permissions(self, ctx: RuntimeContext):
@@ -183,22 +199,26 @@ def crawl_permissions(self, ctx: RuntimeContext):
183199
return
184200
permission_manager = ctx.permission_manager
185201
permission_manager.reset()
186-
permission_manager.snapshot(force_refresh=self._force_refresh)
202+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
203+
permission_manager.snapshot(force_refresh=force_refresh)
187204

188205
@job_task
189206
def crawl_groups(self, ctx: RuntimeContext):
190207
"""Scans all groups for the local group migration scope"""
191-
ctx.group_manager.snapshot(force_refresh=self._force_refresh)
208+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
209+
ctx.group_manager.snapshot(force_refresh=force_refresh)
192210

193211
@job_task
194212
def crawl_redash_dashboards(self, ctx: RuntimeContext):
195213
"""Scans all Redash dashboards."""
196-
ctx.redash_crawler.snapshot(force_refresh=self._force_refresh)
214+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
215+
ctx.redash_crawler.snapshot(force_refresh=force_refresh)
197216

198217
@job_task
199218
def crawl_lakeview_dashboards(self, ctx: RuntimeContext):
200219
"""Scans all Lakeview dashboards."""
201-
ctx.lakeview_crawler.snapshot(force_refresh=self._force_refresh)
220+
force_refresh = ctx.named_parameters.get("force_refresh", "False").lower() in ["true", "1"]
221+
ctx.lakeview_crawler.snapshot(force_refresh=force_refresh)
202222

203223
@job_task(depends_on=[crawl_redash_dashboards, crawl_lakeview_dashboards])
204224
def assess_dashboards(self, ctx: RuntimeContext):

src/databricks/labs/ucx/framework/tasks.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from databricks.labs.lsql.backends import SqlBackend
77
from databricks.sdk import WorkspaceClient
88
from databricks.sdk.core import Config
9-
from databricks.sdk.service.jobs import CronSchedule
9+
from databricks.sdk.service.jobs import CronSchedule, JobParameterDefinition
1010

1111
from databricks.labs.ucx.config import WorkspaceConfig
1212

@@ -65,8 +65,9 @@ def parse_args(*argv) -> dict[str, str]:
6565

6666

6767
class Workflow:
68-
def __init__(self, name: str):
68+
def __init__(self, name: str, named_parameters: list[JobParameterDefinition] | None = None):
6969
self._name = name
70+
self._named_parameters = named_parameters
7071

7172
@property
7273
def name(self):
@@ -77,6 +78,11 @@ def schedule(self) -> CronSchedule | None:
7778
"""The default (cron) schedule for this workflow, or None if it is not scheduled."""
7879
return None
7980

81+
@property
82+
def parameters(self) -> list[JobParameterDefinition] | None:
83+
"""Named parameters for this workflow, or None if there are no parameters."""
84+
return self._named_parameters
85+
8086
def tasks(self) -> Iterable[Task]:
8187
# return __task__ from every method in this class that has this attribute
8288
for attr in dir(self):

src/databricks/labs/ucx/installer/workflows.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,7 @@ def _job_settings(self, workflow_name: str, remote_wheels: list[str]) -> dict[st
855855
"email_notifications": email_notifications,
856856
"schedule": workflow.schedule,
857857
"tasks": job_tasks,
858+
"parameters": workflow.parameters,
858859
}
859860

860861
def _job_task(self, task: Task, remote_wheels: list[str]) -> jobs.Task:

0 commit comments

Comments
 (0)