Skip to content

Commit 2b4865e

Browse files
authored
Crawlers: append snapshots to history journal, if available (#2743)
## Changes This PR introduces a history table where snapshots are journaled after each crawling operation. ### Linked issues Progresses #2572, resolves #2573. ### Functionality - [X] modified existing workflow: `migration-progress-experimental` ### Tests - [X] added unit tests - [X] updated integration tests
1 parent 9408ce2 commit 2b4865e

File tree

24 files changed

+1933
-66
lines changed

24 files changed

+1933
-66
lines changed

src/databricks/labs/ucx/assessment/clusters.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
from collections.abc import Iterable
55
from dataclasses import dataclass
6+
from typing import ClassVar
67

78
from databricks.labs.lsql.backends import SqlBackend
89
from databricks.sdk import WorkspaceClient
@@ -46,6 +47,8 @@ class ClusterInfo:
4647
creator: str | None = None
4748
"""User-name of the creator of the cluster, if known."""
4849

50+
__id_attributes__: ClassVar[tuple[str, ...]] = ("cluster_id",)
51+
4952

5053
class CheckClusterMixin(CheckInitScriptMixin):
5154
_ws: WorkspaceClient
@@ -203,6 +206,8 @@ class PolicyInfo:
203206
creator: str | None = None
204207
"""User-name of the creator of the cluster policy, if known."""
205208

209+
__id_attributes__: ClassVar[tuple[str, ...]] = ("policy_id",)
210+
206211

207212
class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin):
208213
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):

src/databricks/labs/ucx/assessment/jobs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dataclasses import dataclass
55
from datetime import datetime, timedelta, timezone
66
from hashlib import sha256
7+
from typing import ClassVar
78

89
from databricks.labs.lsql.backends import SqlBackend
910
from databricks.sdk import WorkspaceClient
@@ -40,6 +41,8 @@ class JobInfo:
4041
creator: str | None = None
4142
"""User-name of the creator of the pipeline, if known."""
4243

44+
__id_attributes__: ClassVar[tuple[str, ...]] = ("job_id",)
45+
4346

4447
class JobsMixin:
4548
@classmethod

src/databricks/labs/ucx/assessment/pipelines.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
from collections.abc import Iterable
44
from dataclasses import dataclass
5+
from typing import ClassVar
56

67
from databricks.labs.lsql.backends import SqlBackend
78
from databricks.sdk import WorkspaceClient
@@ -24,6 +25,8 @@ class PipelineInfo:
2425
creator_name: str | None = None
2526
"""User-name of the creator of the pipeline, if known."""
2627

28+
__id_attributes__: ClassVar[tuple[str, ...]] = ("pipeline_id",)
29+
2730

2831
class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin):
2932
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):

src/databricks/labs/ucx/contexts/application.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,19 @@
3838
ComputeLocations,
3939
Grant,
4040
GrantsCrawler,
41+
GrantOwnership,
4142
MigrateGrants,
4243
PrincipalACL,
4344
)
4445
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
45-
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
46+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationOwnership
4647
from databricks.labs.ucx.hive_metastore.table_migrate import (
4748
TableMigrationStatusRefresher,
4849
TablesMigrator,
4950
)
5051
from databricks.labs.ucx.hive_metastore.table_move import TableMove
51-
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
52+
from databricks.labs.ucx.hive_metastore.tables import TableOwnership
53+
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership
5254
from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore
5355
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
5456
from databricks.labs.ucx.progress.install import VerifyProgressTracking
@@ -243,14 +245,26 @@ def group_manager(self) -> GroupManager:
243245
def grants_crawler(self) -> GrantsCrawler:
244246
return GrantsCrawler(self.tables_crawler, self.udfs_crawler, self.config.include_databases)
245247

248+
@cached_property
249+
def grant_ownership(self) -> GrantOwnership:
250+
return GrantOwnership(self.administrator_locator)
251+
246252
@cached_property
247253
def udfs_crawler(self) -> UdfsCrawler:
248254
return UdfsCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
249255

256+
@cached_property
257+
def udf_ownership(self) -> UdfOwnership:
258+
return UdfOwnership(self.administrator_locator)
259+
250260
@cached_property
251261
def tables_crawler(self) -> TablesCrawler:
252262
return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
253263

264+
@cached_property
265+
def table_ownership(self) -> TableOwnership:
266+
return TableOwnership(self.administrator_locator)
267+
254268
@cached_property
255269
def tables_migrator(self) -> TablesMigrator:
256270
return TablesMigrator(
@@ -363,6 +377,10 @@ def migration_status_refresher(self) -> TableMigrationStatusRefresher:
363377
self.tables_crawler,
364378
)
365379

380+
@cached_property
381+
def table_migration_ownership(self) -> TableMigrationOwnership:
382+
return TableMigrationOwnership(self.tables_crawler, self.table_ownership)
383+
366384
@cached_property
367385
def iam_credential_manager(self) -> CredentialManager:
368386
return CredentialManager(self.workspace_client)

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 129 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,35 @@
33

44
from databricks.labs.blueprint.installation import Installation
55
from databricks.labs.lsql.backends import RuntimeBackend, SqlBackend
6+
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus
67
from databricks.sdk import WorkspaceClient, core
78

89
from databricks.labs.ucx.__about__ import __version__
9-
from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler
10+
from databricks.labs.ucx.assessment.clusters import (
11+
ClustersCrawler,
12+
PoliciesCrawler,
13+
ClusterOwnership,
14+
ClusterInfo,
15+
ClusterPolicyOwnership,
16+
PolicyInfo,
17+
)
1018
from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptCrawler
11-
from databricks.labs.ucx.assessment.jobs import JobsCrawler, SubmitRunsCrawler
12-
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
19+
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo, JobsCrawler, SubmitRunsCrawler
20+
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler, PipelineInfo, PipelineOwnership
1321
from databricks.labs.ucx.config import WorkspaceConfig
1422
from databricks.labs.ucx.contexts.application import GlobalContext
1523
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
24+
from databricks.labs.ucx.hive_metastore.grants import Grant
1625
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
17-
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
26+
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
27+
from databricks.labs.ucx.hive_metastore.udfs import Udf
1828
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
29+
from databricks.labs.ucx.progress.history import HistoryLog
1930
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder
2031

32+
# As with GlobalContext, service factories unavoidably have a lot of public methods.
33+
# pylint: disable=too-many-public-methods
34+
2135

2236
class RuntimeContext(GlobalContext):
2337
@cached_property
@@ -54,6 +68,10 @@ def installation(self) -> Installation:
5468
def jobs_crawler(self) -> JobsCrawler:
5569
return JobsCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
5670

71+
@cached_property
72+
def job_ownership(self) -> JobOwnership:
73+
return JobOwnership(self.administrator_locator)
74+
5775
@cached_property
5876
def submit_runs_crawler(self) -> SubmitRunsCrawler:
5977
return SubmitRunsCrawler(
@@ -67,10 +85,18 @@ def submit_runs_crawler(self) -> SubmitRunsCrawler:
6785
def clusters_crawler(self) -> ClustersCrawler:
6886
return ClustersCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
6987

88+
@cached_property
89+
def cluster_ownership(self) -> ClusterOwnership:
90+
return ClusterOwnership(self.administrator_locator)
91+
7092
@cached_property
7193
def pipelines_crawler(self) -> PipelinesCrawler:
7294
return PipelinesCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
7395

96+
@cached_property
97+
def pipeline_ownership(self) -> PipelineOwnership:
98+
return PipelineOwnership(self.administrator_locator)
99+
74100
@cached_property
75101
def table_size_crawler(self) -> TableSizeCrawler:
76102
return TableSizeCrawler(self.tables_crawler)
@@ -79,12 +105,18 @@ def table_size_crawler(self) -> TableSizeCrawler:
79105
def policies_crawler(self) -> PoliciesCrawler:
80106
return PoliciesCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
81107

108+
@cached_property
109+
def cluster_policy_ownership(self) -> ClusterPolicyOwnership:
110+
return ClusterPolicyOwnership(self.administrator_locator)
111+
82112
@cached_property
83113
def global_init_scripts_crawler(self) -> GlobalInitScriptCrawler:
84114
return GlobalInitScriptCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
85115

86116
@cached_property
87117
def tables_crawler(self) -> TablesCrawler:
118+
# Warning: Not all runtime contexts support the fast-scan implementation; it requires the JVM bridge to Spark
119+
# and that's not always available.
88120
return FasterTableScanCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
89121

90122
@cached_property
@@ -116,10 +148,102 @@ def workflow_run_recorder(self) -> WorkflowRunRecorder:
116148
return WorkflowRunRecorder(
117149
self.sql_backend,
118150
self.config.ucx_catalog,
119-
workspace_id=self.workspace_client.get_workspace_id(),
151+
workspace_id=self.workspace_id,
120152
workflow_name=self.named_parameters["workflow"],
121153
workflow_id=int(self.named_parameters["job_id"]),
122154
workflow_run_id=int(self.named_parameters["parent_run_id"]),
123155
workflow_run_attempt=int(self.named_parameters.get("attempt", 0)),
124156
workflow_start_time=self.named_parameters["start_time"],
125157
)
158+
159+
@cached_property
160+
def workspace_id(self) -> int:
161+
return self.workspace_client.get_workspace_id()
162+
163+
@cached_property
164+
def historical_clusters_log(self) -> HistoryLog[ClusterInfo]:
165+
return HistoryLog(
166+
self.sql_backend,
167+
self.cluster_ownership,
168+
ClusterInfo,
169+
int(self.named_parameters["parent_run_id"]),
170+
self.workspace_id,
171+
self.config.ucx_catalog,
172+
)
173+
174+
@cached_property
175+
def historical_cluster_policies_log(self) -> HistoryLog[PolicyInfo]:
176+
return HistoryLog(
177+
self.sql_backend,
178+
self.cluster_policy_ownership,
179+
PolicyInfo,
180+
int(self.named_parameters["parent_run_id"]),
181+
self.workspace_id,
182+
self.config.ucx_catalog,
183+
)
184+
185+
@cached_property
186+
def historical_grants_log(self) -> HistoryLog[Grant]:
187+
return HistoryLog(
188+
self.sql_backend,
189+
self.grant_ownership,
190+
Grant,
191+
int(self.named_parameters["parent_run_id"]),
192+
self.workspace_id,
193+
self.config.ucx_catalog,
194+
)
195+
196+
@cached_property
197+
def historical_jobs_log(self) -> HistoryLog[JobInfo]:
198+
return HistoryLog(
199+
self.sql_backend,
200+
self.job_ownership,
201+
JobInfo,
202+
int(self.named_parameters["parent_run_id"]),
203+
self.workspace_id,
204+
self.config.ucx_catalog,
205+
)
206+
207+
@cached_property
208+
def historical_pipelines_log(self) -> HistoryLog[PipelineInfo]:
209+
return HistoryLog(
210+
self.sql_backend,
211+
self.pipeline_ownership,
212+
PipelineInfo,
213+
int(self.named_parameters["parent_run_id"]),
214+
self.workspace_id,
215+
self.config.ucx_catalog,
216+
)
217+
218+
@cached_property
219+
def historical_tables_log(self) -> HistoryLog[Table]:
220+
return HistoryLog(
221+
self.sql_backend,
222+
self.table_ownership,
223+
Table,
224+
int(self.named_parameters["parent_run_id"]),
225+
self.workspace_id,
226+
self.config.ucx_catalog,
227+
)
228+
229+
@cached_property
230+
def historical_table_migration_log(self) -> HistoryLog[TableMigrationStatus]:
231+
return HistoryLog(
232+
self.sql_backend,
233+
self.table_migration_ownership,
234+
TableMigrationStatus,
235+
int(self.named_parameters["parent_run_id"]),
236+
self.workspace_id,
237+
self.config.ucx_catalog,
238+
)
239+
240+
@cached_property
241+
def historical_udfs_log(self) -> HistoryLog[Udf]:
242+
return HistoryLog(
243+
self.sql_backend,
244+
self.udf_ownership,
245+
Udf,
246+
int(self.named_parameters["parent_run_id"]),
247+
self.workspace_id,
248+
self.config.ucx_catalog,
249+
)

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,6 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool)
159159
self._update_snapshot(loaded_records, mode="overwrite")
160160
return loaded_records
161161

162-
def _update_snapshot(self, items: Sequence[Result], mode: Literal["append", "overwrite"] = "append") -> None:
162+
def _update_snapshot(self, items: Sequence[Result], *, mode: Literal["append", "overwrite"]) -> None:
163163
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}")
164164
self._backend.save_table(self.full_name, items, self._klass, mode=mode)

src/databricks/labs/ucx/hive_metastore/grants.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from collections.abc import Callable, Iterable
44
from dataclasses import dataclass, replace
55
from functools import partial, cached_property
6-
from typing import Protocol
6+
from typing import ClassVar, Protocol
77

88
from databricks.labs.blueprint.installation import Installation
99
from databricks.labs.blueprint.parallel import ManyError, Threads
@@ -66,6 +66,8 @@ class Grant:
6666
any_file: bool = False
6767
anonymous_function: bool = False
6868

69+
__id_attributes__: ClassVar[tuple[str, ...]] = ("object_type", "object_key", "action_type", "principal")
70+
6971
@staticmethod
7072
def type_and_key(
7173
*,
@@ -105,6 +107,11 @@ def type_and_key(
105107
)
106108
raise ValueError(msg)
107109

110+
@property
111+
def object_type(self) -> str:
112+
this_type, _ = self.this_type_and_key()
113+
return this_type
114+
108115
@property
109116
def object_key(self) -> str:
110117
_, key = self.this_type_and_key()

src/databricks/labs/ucx/hive_metastore/table_migration_status.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
from dataclasses import dataclass, replace
44
from collections.abc import Iterable, KeysView
5+
from typing import ClassVar
56

67
from databricks.labs.lsql.backends import SqlBackend
78
from databricks.sdk import WorkspaceClient
@@ -25,6 +26,8 @@ class TableMigrationStatus:
2526
dst_table: str | None = None
2627
update_ts: str | None = None
2728

29+
__id_attributes__: ClassVar[tuple[str, ...]] = ("src_schema", "src_table")
30+
2831
def destination(self):
2932
return f"{self.dst_catalog}.{self.dst_schema}.{self.dst_table}".lower()
3033

src/databricks/labs/ucx/hive_metastore/tables.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import re
33
import typing
4-
from collections.abc import Iterable, Iterator, Collection
4+
from collections.abc import Collection, Iterable, Iterator
55
from dataclasses import dataclass
66
from enum import Enum, auto
77
from functools import cached_property, partial
@@ -64,6 +64,8 @@ class Table: # pylint: disable=too-many-public-methods
6464
storage_properties: str | None = None
6565
is_partitioned: bool = False
6666

67+
__id_attributes__: typing.ClassVar[tuple[str, ...]] = ("catalog", "database", "name")
68+
6769
DBFS_ROOT_PREFIXES: typing.ClassVar[list[str]] = [
6870
"/dbfs/",
6971
"dbfs:/",

0 commit comments

Comments
 (0)