1
1
import logging
2
2
3
+ from databricks .sdk .service .jobs import JobParameterDefinition
4
+
3
5
from databricks .labs .ucx .contexts .workflow_task import RuntimeContext
4
6
from databricks .labs .ucx .framework .tasks import Workflow , job_task
5
7
8
10
9
11
10
12
class Assessment (Workflow ): # pylint: disable=too-many-public-methods
11
- def __init__ (self , force_refresh : bool = False ):
12
- self ._force_refresh = force_refresh
13
- super ().__init__ ('assessment' )
13
+ def __init__ (self ):
14
+ self ._force_refresh_param = JobParameterDefinition ( name = " force_refresh" , default = False )
15
+ super ().__init__ ('assessment' , [ self . _force_refresh_param ] )
14
16
15
17
@job_task
16
18
def crawl_tables (self , ctx : RuntimeContext ):
17
19
"""Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such
18
20
as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named
19
21
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
20
- stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
22
+ stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
21
23
cannot easily be migrated to Unity Catalog."""
22
- ctx .tables_crawler .snapshot (force_refresh = self ._force_refresh )
24
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
25
+ ctx .tables_crawler .snapshot (force_refresh = force_refresh )
23
26
24
27
@job_task
25
28
def crawl_udfs (self , ctx : RuntimeContext ):
26
29
"""Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the
27
30
table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for
28
31
issues with grants that cannot be migrated to Unit Catalog."""
29
- ctx .udfs_crawler .snapshot (force_refresh = self ._force_refresh )
32
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
33
+ ctx .udfs_crawler .snapshot (force_refresh = force_refresh )
30
34
31
35
@job_task (job_cluster = "tacl" )
32
36
def setup_tacl (self , ctx : RuntimeContext ):
@@ -41,15 +45,17 @@ def crawl_grants(self, ctx: RuntimeContext):
41
45
42
46
Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table
43
47
ACLs enabled and available for retrieval."""
44
- ctx .grants_crawler .snapshot (force_refresh = self ._force_refresh )
48
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
49
+ ctx .grants_crawler .snapshot (force_refresh = force_refresh )
45
50
46
51
@job_task (depends_on = [crawl_tables ])
47
52
def estimate_table_size_for_migration (self , ctx : RuntimeContext ):
48
53
"""Scans the previously created Delta table named `$inventory_database.tables` and locate tables that cannot be
49
54
"synced". These tables will have to be cloned in the migration process.
50
55
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
51
56
The table size is a factor in deciding whether to clone these tables."""
52
- ctx .table_size_crawler .snapshot (force_refresh = self ._force_refresh )
57
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
58
+ ctx .table_size_crawler .snapshot (force_refresh = force_refresh )
53
59
54
60
@job_task
55
61
def crawl_mounts (self , ctx : RuntimeContext ):
@@ -59,7 +65,8 @@ def crawl_mounts(self, ctx: RuntimeContext):
59
65
60
66
The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently
61
67
storing this information in the `$inventory.mounts` table. This is crucial for planning the migration."""
62
- ctx .mounts_crawler .snapshot (force_refresh = self ._force_refresh )
68
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
69
+ ctx .mounts_crawler .snapshot (force_refresh = force_refresh )
63
70
64
71
@job_task (depends_on = [crawl_mounts , crawl_tables ])
65
72
def guess_external_locations (self , ctx : RuntimeContext ):
@@ -71,7 +78,8 @@ def guess_external_locations(self, ctx: RuntimeContext):
71
78
- Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead
72
79
- Scanning all these locations to identify folders that can act as shared path prefixes
73
80
- These identified external locations will be created subsequently prior to the actual table migration"""
74
- ctx .external_locations .snapshot (force_refresh = self ._force_refresh )
81
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
82
+ ctx .external_locations .snapshot (force_refresh = force_refresh )
75
83
76
84
@job_task
77
85
def assess_jobs (self , ctx : RuntimeContext ):
@@ -84,7 +92,8 @@ def assess_jobs(self, ctx: RuntimeContext):
84
92
- Clusters with incompatible Spark config tags
85
93
- Clusters referencing DBFS locations in one or more config options
86
94
"""
87
- ctx .jobs_crawler .snapshot (force_refresh = self ._force_refresh )
95
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
96
+ ctx .jobs_crawler .snapshot (force_refresh = force_refresh )
88
97
89
98
@job_task
90
99
def assess_clusters (self , ctx : RuntimeContext ):
@@ -97,7 +106,8 @@ def assess_clusters(self, ctx: RuntimeContext):
97
106
- Clusters with incompatible spark config tags
98
107
- Clusters referencing DBFS locations in one or more config options
99
108
"""
100
- ctx .clusters_crawler .snapshot (force_refresh = self ._force_refresh )
109
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
110
+ ctx .clusters_crawler .snapshot (force_refresh = force_refresh )
101
111
102
112
@job_task
103
113
def assess_pipelines (self , ctx : RuntimeContext ):
@@ -110,7 +120,8 @@ def assess_pipelines(self, ctx: RuntimeContext):
110
120
111
121
Subsequently, a list of all the pipelines with matching configurations are stored in the
112
122
`$inventory.pipelines` table."""
113
- ctx .pipelines_crawler .snapshot (force_refresh = self ._force_refresh )
123
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
124
+ ctx .pipelines_crawler .snapshot (force_refresh = force_refresh )
114
125
115
126
@job_task
116
127
def assess_incompatible_submit_runs (self , ctx : RuntimeContext ):
@@ -123,7 +134,8 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
123
134
It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration.
124
135
Subsequently, a list of all the incompatible runs with failures are stored in the
125
136
`$inventory.submit_runs` table."""
126
- ctx .submit_runs_crawler .snapshot (force_refresh = self ._force_refresh )
137
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
138
+ ctx .submit_runs_crawler .snapshot (force_refresh = force_refresh )
127
139
128
140
@job_task
129
141
def crawl_cluster_policies (self , ctx : RuntimeContext ):
@@ -134,7 +146,8 @@ def crawl_cluster_policies(self, ctx: RuntimeContext):
134
146
135
147
Subsequently, a list of all the policies with matching configurations are stored in the
136
148
`$inventory.policies` table."""
137
- ctx .policies_crawler .snapshot (force_refresh = self ._force_refresh )
149
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
150
+ ctx .policies_crawler .snapshot (force_refresh = force_refresh )
138
151
139
152
@job_task (cloud = "azure" )
140
153
def assess_azure_service_principals (self , ctx : RuntimeContext ):
@@ -148,7 +161,8 @@ def assess_azure_service_principals(self, ctx: RuntimeContext):
148
161
Subsequently, the list of all the Azure Service Principals referred in those configurations are saved
149
162
in the `$inventory.azure_service_principals` table."""
150
163
if ctx .is_azure :
151
- ctx .azure_service_principal_crawler .snapshot (force_refresh = self ._force_refresh )
164
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
165
+ ctx .azure_service_principal_crawler .snapshot (force_refresh = force_refresh )
152
166
153
167
@job_task
154
168
def assess_global_init_scripts (self , ctx : RuntimeContext ):
@@ -157,7 +171,8 @@ def assess_global_init_scripts(self, ctx: RuntimeContext):
157
171
158
172
It looks in:
159
173
- the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table."""
160
- ctx .global_init_scripts_crawler .snapshot (force_refresh = self ._force_refresh )
174
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
175
+ ctx .global_init_scripts_crawler .snapshot (force_refresh = force_refresh )
161
176
162
177
@job_task
163
178
def workspace_listing (self , ctx : RuntimeContext ):
@@ -169,7 +184,8 @@ def workspace_listing(self, ctx: RuntimeContext):
169
184
if not ctx .config .use_legacy_permission_migration :
170
185
logger .info ("Skipping workspace listing as legacy permission migration is disabled." )
171
186
return
172
- ctx .workspace_listing .snapshot (force_refresh = self ._force_refresh )
187
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
188
+ ctx .workspace_listing .snapshot (force_refresh = force_refresh )
173
189
174
190
@job_task (depends_on = [crawl_grants , workspace_listing ])
175
191
def crawl_permissions (self , ctx : RuntimeContext ):
@@ -183,22 +199,26 @@ def crawl_permissions(self, ctx: RuntimeContext):
183
199
return
184
200
permission_manager = ctx .permission_manager
185
201
permission_manager .reset ()
186
- permission_manager .snapshot (force_refresh = self ._force_refresh )
202
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
203
+ permission_manager .snapshot (force_refresh = force_refresh )
187
204
188
205
@job_task
189
206
def crawl_groups (self , ctx : RuntimeContext ):
190
207
"""Scans all groups for the local group migration scope"""
191
- ctx .group_manager .snapshot (force_refresh = self ._force_refresh )
208
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
209
+ ctx .group_manager .snapshot (force_refresh = force_refresh )
192
210
193
211
@job_task
194
212
def crawl_redash_dashboards (self , ctx : RuntimeContext ):
195
213
"""Scans all Redash dashboards."""
196
- ctx .redash_crawler .snapshot (force_refresh = self ._force_refresh )
214
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
215
+ ctx .redash_crawler .snapshot (force_refresh = force_refresh )
197
216
198
217
@job_task
199
218
def crawl_lakeview_dashboards (self , ctx : RuntimeContext ):
200
219
"""Scans all Lakeview dashboards."""
201
- ctx .lakeview_crawler .snapshot (force_refresh = self ._force_refresh )
220
+ force_refresh = ctx .named_parameters .get ("force_refresh" , "False" ).lower () in ["true" , "1" ]
221
+ ctx .lakeview_crawler .snapshot (force_refresh = force_refresh )
202
222
203
223
@job_task (depends_on = [crawl_redash_dashboards , crawl_lakeview_dashboards ])
204
224
def assess_dashboards (self , ctx : RuntimeContext ):
0 commit comments