Skip to content

Commit 3c34640

Browse files
committed
Merge branch 'migration-sequencing-phase-1' into migration-sequencing-phase-2
2 parents 1502ad8 + 8c666e5 commit 3c34640

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1046
-348
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ dependencies = ["databricks-sdk~=0.30",
4848
"databricks-labs-lsql>=0.5,<0.13",
4949
"databricks-labs-blueprint>=0.9.1,<0.10",
5050
"PyYAML>=6.0.0,<7.0.0",
51-
"sqlglot>=25.5.0,<25.27",
51+
"sqlglot>=25.5.0,<25.28",
5252
"astroid>=3.3.1"]
5353

5454
[project.optional-dependencies]

src/databricks/labs/ucx/assessment/azure.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ class ServicePrincipalClusterMapping:
4141

4242

4343
class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo], JobsMixin, SecretsMixin):
44-
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
45-
super().__init__(sbe, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo)
44+
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
45+
super().__init__(sql_backend, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo)
4646
self._ws = ws
4747

4848
def _try_fetch(self) -> Iterable[AzureServicePrincipalInfo]:

src/databricks/labs/ucx/assessment/clusters.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ def _check_cluster_failures(self, cluster: ClusterDetails, source: str) -> list[
159159

160160

161161
class ClustersCrawler(CrawlerBase[ClusterInfo], CheckClusterMixin):
162-
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str):
163-
super().__init__(sbe, "hive_metastore", schema, "clusters", ClusterInfo)
162+
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema: str):
163+
super().__init__(sql_backend, "hive_metastore", schema, "clusters", ClusterInfo)
164164
self._ws = ws
165165

166166
def _crawl(self) -> Iterable[ClusterInfo]:
@@ -214,8 +214,8 @@ class PolicyInfo:
214214

215215

216216
class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin):
217-
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
218-
super().__init__(sbe, "hive_metastore", schema, "policies", PolicyInfo)
217+
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
218+
super().__init__(sql_backend, "hive_metastore", schema, "policies", PolicyInfo)
219219
self._ws = ws
220220

221221
def _crawl(self) -> Iterable[PolicyInfo]:

src/databricks/labs/ucx/assessment/init_scripts.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ def check_init_script(self, init_script_data: str | None, source: str) -> list[s
4141

4242

4343
class GlobalInitScriptCrawler(CrawlerBase[GlobalInitScriptInfo], CheckInitScriptMixin):
44-
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
45-
super().__init__(sbe, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo)
44+
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
45+
super().__init__(sql_backend, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo)
4646
self._ws = ws
4747

4848
def _crawl(self) -> Iterable[GlobalInitScriptInfo]:

src/databricks/labs/ucx/assessment/jobs.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ def _job_clusters(job: BaseJob) -> Iterable[tuple[BaseJob, ClusterSpec]]:
9494

9595

9696
class JobsCrawler(CrawlerBase[JobInfo], JobsMixin, CheckClusterMixin):
97-
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
98-
super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo)
97+
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
98+
super().__init__(sql_backend, "hive_metastore", schema, "jobs", JobInfo)
9999
self._ws = ws
100100

101101
def _crawl(self) -> Iterable[JobInfo]:
@@ -182,8 +182,8 @@ class SubmitRunsCrawler(CrawlerBase[SubmitRunInfo], JobsMixin, CheckClusterMixin
182182
"fs.adl",
183183
]
184184

185-
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str, num_days_history: int):
186-
super().__init__(sbe, "hive_metastore", schema, "submit_runs", SubmitRunInfo)
185+
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema: str, num_days_history: int):
186+
super().__init__(sql_backend, "hive_metastore", schema, "submit_runs", SubmitRunInfo)
187187
self._ws = ws
188188
self._num_days_history = num_days_history
189189

src/databricks/labs/ucx/assessment/pipelines.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ class PipelineInfo:
2929

3030

3131
class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin):
32-
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
33-
super().__init__(sbe, "hive_metastore", schema, "pipelines", PipelineInfo)
32+
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
33+
super().__init__(sql_backend, "hive_metastore", schema, "pipelines", PipelineInfo)
3434
self._ws = ws
3535

3636
def _crawl(self) -> Iterable[PipelineInfo]:

src/databricks/labs/ucx/contexts/application.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from databricks.labs.ucx.recon.metadata_retriever import DatabricksTableMetadataRetriever
1818
from databricks.labs.ucx.recon.migration_recon import MigrationRecon
1919
from databricks.labs.ucx.recon.schema_comparator import StandardSchemaComparator
20-
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
20+
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler, DirectFsAccessOwnership
2121
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
2222
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler
2323
from databricks.sdk import AccountClient, WorkspaceClient, core
@@ -28,7 +28,7 @@
2828
from databricks.labs.ucx.assessment.export import AssessmentExporter
2929
from databricks.labs.ucx.aws.credentials import CredentialManager
3030
from databricks.labs.ucx.config import WorkspaceConfig
31-
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership
31+
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership
3232
from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler
3333
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
3434
from databricks.labs.ucx.hive_metastore.grants import (
@@ -43,13 +43,13 @@
4343
PrincipalACL,
4444
)
4545
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
46-
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationOwnership
46+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
47+
from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership
4748
from databricks.labs.ucx.hive_metastore.table_migrate import (
4849
TableMigrationStatusRefresher,
4950
TablesMigrator,
5051
)
5152
from databricks.labs.ucx.hive_metastore.table_move import TableMove
52-
from databricks.labs.ucx.hive_metastore.tables import TableOwnership
5353
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership
5454
from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore
5555
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
@@ -263,12 +263,32 @@ def tables_crawler(self) -> TablesCrawler:
263263

264264
@cached_property
265265
def table_ownership(self) -> TableOwnership:
266-
return TableOwnership(self.administrator_locator)
266+
return TableOwnership(
267+
self.administrator_locator,
268+
self.grants_crawler,
269+
self.used_tables_crawler_for_paths,
270+
self.used_tables_crawler_for_queries,
271+
self.legacy_query_ownership,
272+
self.workspace_path_ownership,
273+
)
267274

268275
@cached_property
269276
def workspace_path_ownership(self) -> WorkspacePathOwnership:
270277
return WorkspacePathOwnership(self.administrator_locator, self.workspace_client)
271278

279+
@cached_property
280+
def legacy_query_ownership(self) -> LegacyQueryOwnership:
281+
return LegacyQueryOwnership(self.administrator_locator, self.workspace_client)
282+
283+
@cached_property
284+
def directfs_access_ownership(self) -> DirectFsAccessOwnership:
285+
return DirectFsAccessOwnership(
286+
self.administrator_locator,
287+
self.workspace_path_ownership,
288+
self.legacy_query_ownership,
289+
self.workspace_client,
290+
)
291+
272292
@cached_property
273293
def tables_migrator(self) -> TablesMigrator:
274294
return TablesMigrator(

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
2727
from databricks.labs.ucx.hive_metastore.udfs import Udf
2828
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
29-
from databricks.labs.ucx.progress.history import HistoryLog
29+
from databricks.labs.ucx.progress.history import ProgressEncoder
30+
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
3031
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder
3132

3233
# As with GlobalContext, service factories unavoidably have a lot of public methods.
@@ -137,7 +138,7 @@ def task_run_warning_recorder(self) -> TaskRunWarningRecorder:
137138
self._config_path.parent,
138139
self.named_parameters["workflow"],
139140
int(self.named_parameters["job_id"]),
140-
int(self.named_parameters["parent_run_id"]),
141+
self.parent_run_id,
141142
self.sql_backend,
142143
self.inventory_database,
143144
int(self.named_parameters.get("attempt", "0")),
@@ -151,7 +152,7 @@ def workflow_run_recorder(self) -> WorkflowRunRecorder:
151152
workspace_id=self.workspace_id,
152153
workflow_name=self.named_parameters["workflow"],
153154
workflow_id=int(self.named_parameters["job_id"]),
154-
workflow_run_id=int(self.named_parameters["parent_run_id"]),
155+
workflow_run_id=self.parent_run_id,
155156
workflow_run_attempt=int(self.named_parameters.get("attempt", 0)),
156157
workflow_start_time=self.named_parameters["start_time"],
157158
)
@@ -161,89 +162,94 @@ def workspace_id(self) -> int:
161162
return self.workspace_client.get_workspace_id()
162163

163164
@cached_property
164-
def historical_clusters_log(self) -> HistoryLog[ClusterInfo]:
165-
return HistoryLog(
165+
def parent_run_id(self) -> int:
166+
return int(self.named_parameters["parent_run_id"])
167+
168+
@cached_property
169+
def clusters_progress(self) -> ProgressEncoder[ClusterInfo]:
170+
return ProgressEncoder(
166171
self.sql_backend,
167172
self.cluster_ownership,
168173
ClusterInfo,
169-
int(self.named_parameters["parent_run_id"]),
174+
self.parent_run_id,
170175
self.workspace_id,
171176
self.config.ucx_catalog,
172177
)
173178

174179
@cached_property
175-
def historical_cluster_policies_log(self) -> HistoryLog[PolicyInfo]:
176-
return HistoryLog(
180+
def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
181+
return ProgressEncoder(
177182
self.sql_backend,
178183
self.cluster_policy_ownership,
179184
PolicyInfo,
180-
int(self.named_parameters["parent_run_id"]),
185+
self.parent_run_id,
181186
self.workspace_id,
182187
self.config.ucx_catalog,
183188
)
184189

185190
@cached_property
186-
def historical_grants_log(self) -> HistoryLog[Grant]:
187-
return HistoryLog(
191+
def grants_progress(self) -> ProgressEncoder[Grant]:
192+
return ProgressEncoder(
188193
self.sql_backend,
189194
self.grant_ownership,
190195
Grant,
191-
int(self.named_parameters["parent_run_id"]),
196+
self.parent_run_id,
192197
self.workspace_id,
193198
self.config.ucx_catalog,
194199
)
195200

196201
@cached_property
197-
def historical_jobs_log(self) -> HistoryLog[JobInfo]:
198-
return HistoryLog(
202+
def jobs_progress(self) -> ProgressEncoder[JobInfo]:
203+
return JobsProgressEncoder(
199204
self.sql_backend,
200205
self.job_ownership,
201-
JobInfo,
202-
int(self.named_parameters["parent_run_id"]),
206+
self.inventory_database,
207+
self.parent_run_id,
203208
self.workspace_id,
204209
self.config.ucx_catalog,
205210
)
206211

207212
@cached_property
208-
def historical_pipelines_log(self) -> HistoryLog[PipelineInfo]:
209-
return HistoryLog(
213+
def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
214+
return ProgressEncoder(
210215
self.sql_backend,
211216
self.pipeline_ownership,
212217
PipelineInfo,
213-
int(self.named_parameters["parent_run_id"]),
218+
self.parent_run_id,
214219
self.workspace_id,
215220
self.config.ucx_catalog,
216221
)
217222

218223
@cached_property
219-
def historical_tables_log(self) -> HistoryLog[Table]:
220-
return HistoryLog(
224+
def tables_progress(self) -> ProgressEncoder[Table]:
225+
return ProgressEncoder(
221226
self.sql_backend,
222227
self.table_ownership,
223228
Table,
224-
int(self.named_parameters["parent_run_id"]),
229+
self.parent_run_id,
225230
self.workspace_id,
226231
self.config.ucx_catalog,
227232
)
228233

229234
@cached_property
230-
def historical_table_migration_log(self) -> HistoryLog[TableMigrationStatus]:
231-
return HistoryLog(
235+
def historical_table_migration_log(self) -> ProgressEncoder[TableMigrationStatus]:
236+
# TODO: merge into tables_progress
237+
return ProgressEncoder(
232238
self.sql_backend,
233239
self.table_migration_ownership,
234240
TableMigrationStatus,
235-
int(self.named_parameters["parent_run_id"]),
241+
self.parent_run_id,
236242
self.workspace_id,
237243
self.config.ucx_catalog,
238244
)
239245

240246
@cached_property
241-
def historical_udfs_log(self) -> HistoryLog[Udf]:
242-
return HistoryLog(
247+
def udfs_progress(self) -> ProgressEncoder[Udf]:
248+
return ProgressEncoder(
243249
self.sql_backend,
244250
self.udf_ownership,
245251
Udf,
246-
int(self.named_parameters["parent_run_id"]),
252+
self.parent_run_id,
247253
self.workspace_id,
248254
self.config.ucx_catalog,
249255
)

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ class DataclassInstance(Protocol):
2121

2222

2323
class CrawlerBase(ABC, Generic[Result]):
24-
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]) -> None:
24+
def __init__(self, sql_backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]) -> None:
2525
"""
2626
Initializes a CrawlerBase instance.
2727
2828
Args:
29-
backend (SqlBackend): The backend that executes SQL queries:
29+
sql_backend (SqlBackend): The backend that executes SQL queries:
3030
Statement Execution API or Databricks Runtime.
3131
catalog (str): The catalog name for the inventory persistence.
3232
schema: The schema name for the inventory persistence.
@@ -35,9 +35,9 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k
3535
self._catalog = self._valid(catalog)
3636
self._schema = self._valid(schema)
3737
self._table = self._valid(table)
38-
self._backend = backend
39-
self._fetch = backend.fetch
40-
self._exec = backend.execute
38+
self._sql_backend = sql_backend
39+
self._fetch = sql_backend.fetch
40+
self._exec = sql_backend.execute
4141
self._klass = klass
4242

4343
@property
@@ -161,4 +161,4 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool)
161161

162162
def _update_snapshot(self, items: Sequence[Result], *, mode: Literal["append", "overwrite"]) -> None:
163163
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}")
164-
self._backend.save_table(self.full_name, items, self._klass, mode=mode)
164+
self._sql_backend.save_table(self.full_name, items, self._klass, mode=mode)

src/databricks/labs/ucx/framework/owners.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli
201201
super().__init__(administrator_locator)
202202
self._ws = ws
203203

204+
def owner_of_path(self, path: str) -> str:
205+
return self.owner_of(WorkspacePath(self._ws, path))
206+
204207
@retried(on=[InternalError], timeout=timedelta(minutes=1))
205208
def _maybe_direct_owner(self, record: WorkspacePath) -> str | None:
206209
maybe_type_and_id = self._maybe_type_and_id(record)
@@ -237,3 +240,18 @@ def _infer_from_first_can_manage(object_permissions):
237240
return acl.group_name
238241
return acl.service_principal_name
239242
return None
243+
244+
245+
class LegacyQueryOwnership(Ownership[str]):
246+
def __init__(self, administrator_locator: AdministratorLocator, workspace_client: WorkspaceClient) -> None:
247+
super().__init__(administrator_locator)
248+
self._workspace_client = workspace_client
249+
250+
def _maybe_direct_owner(self, record: str) -> str | None:
251+
try:
252+
legacy_query = self._workspace_client.queries.get(record)
253+
return legacy_query.owner_user_name
254+
except NotFound:
255+
return None
256+
except InternalError: # redash is very naughty and throws 500s instead of proper 404s
257+
return None

0 commit comments

Comments
 (0)