8
8
9
9
10
10
class Assessment (Workflow ): # pylint: disable=too-many-public-methods
11
- def __init__ (self ):
11
+ def __init__ (self , force_refresh : bool = False ):
12
+ self ._force_refresh = force_refresh
12
13
super ().__init__ ('assessment' )
13
14
14
15
@job_task
@@ -18,14 +19,14 @@ def crawl_tables(self, ctx: RuntimeContext):
18
19
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
19
20
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
20
21
cannot easily be migrated to Unity Catalog."""
21
- ctx .tables_crawler .snapshot (force_refresh = True )
22
+ ctx .tables_crawler .snapshot (force_refresh = self . _force_refresh )
22
23
23
24
@job_task
24
25
def crawl_udfs (self , ctx : RuntimeContext ):
25
26
"""Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the
26
27
table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for
27
28
issues with grants that cannot be migrated to Unit Catalog."""
28
- ctx .udfs_crawler .snapshot (force_refresh = True )
29
+ ctx .udfs_crawler .snapshot (force_refresh = self . _force_refresh )
29
30
30
31
@job_task (job_cluster = "tacl" )
31
32
def setup_tacl (self , ctx : RuntimeContext ):
@@ -40,15 +41,15 @@ def crawl_grants(self, ctx: RuntimeContext):
40
41
41
42
Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table
42
43
ACLs enabled and available for retrieval."""
43
- ctx .grants_crawler .snapshot (force_refresh = True )
44
+ ctx .grants_crawler .snapshot (force_refresh = self . _force_refresh )
44
45
45
46
@job_task (depends_on = [crawl_tables ])
46
47
def estimate_table_size_for_migration (self , ctx : RuntimeContext ):
47
48
"""Scans the previously created Delta table named `$inventory_database.tables` and locate tables that cannot be
48
49
"synced". These tables will have to be cloned in the migration process.
49
50
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
50
51
The table size is a factor in deciding whether to clone these tables."""
51
- ctx .table_size_crawler .snapshot (force_refresh = True )
52
+ ctx .table_size_crawler .snapshot (force_refresh = self . _force_refresh )
52
53
53
54
@job_task
54
55
def crawl_mounts (self , ctx : RuntimeContext ):
@@ -58,7 +59,7 @@ def crawl_mounts(self, ctx: RuntimeContext):
58
59
59
60
The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently
60
61
storing this information in the `$inventory.mounts` table. This is crucial for planning the migration."""
61
- ctx .mounts_crawler .snapshot (force_refresh = True )
62
+ ctx .mounts_crawler .snapshot (force_refresh = self . _force_refresh )
62
63
63
64
@job_task (depends_on = [crawl_mounts , crawl_tables ])
64
65
def guess_external_locations (self , ctx : RuntimeContext ):
@@ -70,7 +71,7 @@ def guess_external_locations(self, ctx: RuntimeContext):
70
71
- Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead
71
72
- Scanning all these locations to identify folders that can act as shared path prefixes
72
73
- These identified external locations will be created subsequently prior to the actual table migration"""
73
- ctx .external_locations .snapshot (force_refresh = True )
74
+ ctx .external_locations .snapshot (force_refresh = self . _force_refresh )
74
75
75
76
@job_task
76
77
def assess_jobs (self , ctx : RuntimeContext ):
@@ -83,7 +84,7 @@ def assess_jobs(self, ctx: RuntimeContext):
83
84
- Clusters with incompatible Spark config tags
84
85
- Clusters referencing DBFS locations in one or more config options
85
86
"""
86
- ctx .jobs_crawler .snapshot (force_refresh = True )
87
+ ctx .jobs_crawler .snapshot (force_refresh = self . _force_refresh )
87
88
88
89
@job_task
89
90
def assess_clusters (self , ctx : RuntimeContext ):
@@ -96,7 +97,7 @@ def assess_clusters(self, ctx: RuntimeContext):
96
97
- Clusters with incompatible spark config tags
97
98
- Clusters referencing DBFS locations in one or more config options
98
99
"""
99
- ctx .clusters_crawler .snapshot (force_refresh = True )
100
+ ctx .clusters_crawler .snapshot (force_refresh = self . _force_refresh )
100
101
101
102
@job_task
102
103
def assess_pipelines (self , ctx : RuntimeContext ):
@@ -109,7 +110,7 @@ def assess_pipelines(self, ctx: RuntimeContext):
109
110
110
111
Subsequently, a list of all the pipelines with matching configurations are stored in the
111
112
`$inventory.pipelines` table."""
112
- ctx .pipelines_crawler .snapshot (force_refresh = True )
113
+ ctx .pipelines_crawler .snapshot (force_refresh = self . _force_refresh )
113
114
114
115
@job_task
115
116
def assess_incompatible_submit_runs (self , ctx : RuntimeContext ):
@@ -122,7 +123,7 @@ def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
122
123
It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration.
123
124
Subsequently, a list of all the incompatible runs with failures are stored in the
124
125
`$inventory.submit_runs` table."""
125
- ctx .submit_runs_crawler .snapshot (force_refresh = True )
126
+ ctx .submit_runs_crawler .snapshot (force_refresh = self . _force_refresh )
126
127
127
128
@job_task
128
129
def crawl_cluster_policies (self , ctx : RuntimeContext ):
@@ -133,7 +134,7 @@ def crawl_cluster_policies(self, ctx: RuntimeContext):
133
134
134
135
Subsequently, a list of all the policies with matching configurations are stored in the
135
136
`$inventory.policies` table."""
136
- ctx .policies_crawler .snapshot (force_refresh = True )
137
+ ctx .policies_crawler .snapshot (force_refresh = self . _force_refresh )
137
138
138
139
@job_task (cloud = "azure" )
139
140
def assess_azure_service_principals (self , ctx : RuntimeContext ):
@@ -147,7 +148,7 @@ def assess_azure_service_principals(self, ctx: RuntimeContext):
147
148
Subsequently, the list of all the Azure Service Principals referred in those configurations are saved
148
149
in the `$inventory.azure_service_principals` table."""
149
150
if ctx .is_azure :
150
- ctx .azure_service_principal_crawler .snapshot (force_refresh = True )
151
+ ctx .azure_service_principal_crawler .snapshot (force_refresh = self . _force_refresh )
151
152
152
153
@job_task
153
154
def assess_global_init_scripts (self , ctx : RuntimeContext ):
@@ -156,7 +157,7 @@ def assess_global_init_scripts(self, ctx: RuntimeContext):
156
157
157
158
It looks in:
158
159
- the list of all the global init scripts are saved in the `$inventory.global_init_scripts` table."""
159
- ctx .global_init_scripts_crawler .snapshot (force_refresh = True )
160
+ ctx .global_init_scripts_crawler .snapshot (force_refresh = self . _force_refresh )
160
161
161
162
@job_task
162
163
def workspace_listing (self , ctx : RuntimeContext ):
@@ -168,7 +169,7 @@ def workspace_listing(self, ctx: RuntimeContext):
168
169
if not ctx .config .use_legacy_permission_migration :
169
170
logger .info ("Skipping workspace listing as legacy permission migration is disabled." )
170
171
return
171
- ctx .workspace_listing .snapshot (force_refresh = True )
172
+ ctx .workspace_listing .snapshot (force_refresh = self . _force_refresh )
172
173
173
174
@job_task (depends_on = [crawl_grants , workspace_listing ])
174
175
def crawl_permissions (self , ctx : RuntimeContext ):
@@ -182,22 +183,22 @@ def crawl_permissions(self, ctx: RuntimeContext):
182
183
return
183
184
permission_manager = ctx .permission_manager
184
185
permission_manager .reset ()
185
- permission_manager .snapshot (force_refresh = True )
186
+ permission_manager .snapshot (force_refresh = self . _force_refresh )
186
187
187
188
@job_task
188
189
def crawl_groups (self , ctx : RuntimeContext ):
189
190
"""Scans all groups for the local group migration scope"""
190
- ctx .group_manager .snapshot (force_refresh = True )
191
+ ctx .group_manager .snapshot (force_refresh = self . _force_refresh )
191
192
192
193
@job_task
193
194
def crawl_redash_dashboards (self , ctx : RuntimeContext ):
194
195
"""Scans all Redash dashboards."""
195
- ctx .redash_crawler .snapshot (force_refresh = True )
196
+ ctx .redash_crawler .snapshot (force_refresh = self . _force_refresh )
196
197
197
198
@job_task
198
199
def crawl_lakeview_dashboards (self , ctx : RuntimeContext ):
199
200
"""Scans all Lakeview dashboards."""
200
- ctx .lakeview_crawler .snapshot (force_refresh = True )
201
+ ctx .lakeview_crawler .snapshot (force_refresh = self . _force_refresh )
201
202
202
203
@job_task (depends_on = [crawl_redash_dashboards , crawl_lakeview_dashboards ])
203
204
def assess_dashboards (self , ctx : RuntimeContext ):
0 commit comments