Skip to content

Commit 15dd48d

Browse files
committed
Introduce an optional history log, where crawler snapshots are journalled.
Work in progress.
1 parent b16098c commit 15dd48d

File tree

6 files changed

+184
-12
lines changed

6 files changed

+184
-12
lines changed

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
import datetime as dt
13
import logging
24
from abc import ABC, abstractmethod
35
from collections.abc import Callable, Iterable, Sequence
@@ -6,6 +8,7 @@
68
from databricks.labs.lsql.backends import SqlBackend
79
from databricks.sdk.errors import NotFound
810

11+
from databricks.labs.ucx.framework.history import HistoryLog
912
from databricks.labs.ucx.framework.utils import escape_sql_identifier
1013

1114
logger = logging.getLogger(__name__)
@@ -21,13 +24,22 @@ class DataclassInstance(Protocol):
2124

2225

2326
class CrawlerBase(ABC, Generic[Result]):
24-
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]):
27+
def __init__(
28+
self,
29+
backend: SqlBackend,
30+
catalog: str,
31+
schema: str,
32+
table: str,
33+
klass: type[Result],
34+
history_log: HistoryLog | None = None,
35+
):
2536
"""
2637
Initializes a CrawlerBase instance.
2738
2839
Args:
2940
backend (SqlBackend): The backend that executes SQL queries:
3041
Statement Execution API or Databricks Runtime.
42+
history_log: The (optional) history log where (new) snapshots should be saved.
3143
catalog (str): The catalog name for the inventory persistence.
3244
schema: The schema name for the inventory persistence.
3345
table: The table name for the inventory persistence.
@@ -36,6 +48,7 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k
3648
self._schema = self._valid(schema)
3749
self._table = self._valid(table)
3850
self._backend = backend
51+
self._history_log = history_log
3952
self._fetch = backend.fetch
4053
self._exec = backend.execute
4154
self._klass = klass
@@ -155,10 +168,16 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool)
155168
except NotFound as e:
156169
logger.debug("Inventory table not found", exc_info=e)
157170
logger.debug(f"[{self.full_name}] crawling new set of snapshot data for {self._table}")
171+
crawl_start_time = dt.datetime.now(tz=dt.timezone.utc)
158172
loaded_records = list(loader())
159-
self._update_snapshot(loaded_records, mode="overwrite")
173+
self._update_snapshot(loaded_records, crawl_start_time=crawl_start_time, mode="overwrite")
160174
return loaded_records
161175

162-
def _update_snapshot(self, items: Sequence[Result], mode: Literal["append", "overwrite"] = "append") -> None:
176+
def _update_snapshot(
177+
self, items: Sequence[Result], *, crawl_start_time: dt.datetime, mode: Literal["append", "overwrite"]
178+
) -> None:
163179
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}")
164180
self._backend.save_table(self.full_name, items, self._klass, mode=mode)
181+
if self._history_log:
182+
appender = self._history_log.appender(self._klass)
183+
appender.append_snapshot(items, run_start_time=crawl_start_time)
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from __future__ import annotations
2+
import dataclasses
3+
import datetime as dt
4+
import json
5+
import logging
6+
import uuid
7+
from collections.abc import Callable, Sequence
8+
from dataclasses import dataclass
9+
from functools import cached_property
10+
from typing import ClassVar, Protocol, TypeVar
11+
12+
from databricks.labs.lsql.backends import SqlBackend
13+
from databricks.sdk import WorkspaceClient
14+
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass(frozen=True, kw_only=True)
20+
class HistoricalRecord:
21+
workspace_id: int
22+
"""The identifier of the workspace where this record was generated."""
23+
24+
run_id: str
25+
"""An identifier of the workflow run that generated this record."""
26+
27+
snapshot_id: str
28+
"""An identifier that is unique to the records produced for a given snapshot."""
29+
30+
run_start_time: dt.datetime
31+
"""When this record was generated."""
32+
33+
object_type: str
34+
"""The inventory table for which this record was generated."""
35+
36+
object_type_version: int
37+
"""Versioning of inventory table, for forward compatibility."""
38+
39+
object_id: str
40+
"""The type-specific identifier for this inventory record."""
41+
42+
object_data: str
43+
"""Type-specific JSON-encoded data of the inventory record."""
44+
45+
failures: list[str]
46+
"""The list of problems associated with the object that this inventory record covers."""
47+
48+
owner: str
49+
"""The identity of the account that created this inventory record."""
50+
51+
52+
class DataclassInstance(Protocol):
53+
__dataclass_fields__: ClassVar[dict]
54+
55+
56+
Record = TypeVar("Record", bound=DataclassInstance)
57+
58+
59+
class HistoryLog:
60+
__slots__ = ("_ws", "_backend", "_run_id", "_catalog", "_schema", "_table")
61+
62+
def __init__(
63+
self,
64+
ws: WorkspaceClient,
65+
backend: SqlBackend,
66+
run_id: str,
67+
catalog: str,
68+
schema: str,
69+
table: str,
70+
) -> None:
71+
self._ws = ws
72+
self._backend = backend
73+
self._run_id = run_id
74+
self._catalog = catalog
75+
self._schema = schema
76+
self._table = table
77+
78+
@property
79+
def full_name(self) -> str:
80+
return f"{self._catalog}.{self._schema}.{self._table}"
81+
82+
def _append_history_snapshot(self, object_type: str, snapshot: list[HistoricalRecord]) -> None:
83+
logger.debug(f"[{self.full_name}] appending {len(snapshot)} new records for {object_type}")
84+
# Concurrent writes do not need to be handled here; appends cannot conflict.
85+
# TODO: Although documented as conflict-free, verify that this is truly is the case.
86+
self._backend.save_table(self.full_name, snapshot, HistoricalRecord, mode="append")
87+
88+
class Appender:
89+
__slots__ = ("_ws", "_object_type", "_object_type_version", "_key_from", "_run_id", "_persist")
90+
91+
def __init__(
92+
self,
93+
ws: WorkspaceClient,
94+
run_id: str,
95+
klass: type[Record],
96+
key_from: Callable[[Record], str],
97+
persist: Callable[[str, list[HistoricalRecord]], None],
98+
) -> None:
99+
self._ws = ws
100+
self._run_id = run_id
101+
self._object_type = klass.__name__
102+
# Versioning support: if the dataclass has a _ucx_version class attribute that is the current version.
103+
self._object_type_version = getattr(klass, "_ucx_version") if hasattr(klass, "_ucx_version") else 0
104+
self._key_from = key_from
105+
self._persist = persist
106+
107+
@cached_property
108+
def _workspace_id(self) -> int:
109+
return self._ws.get_workspace_id()
110+
111+
@cached_property
112+
def _owner(self) -> str:
113+
current_user = self._ws.current_user.me()
114+
owner = current_user.user_name or current_user.id
115+
assert owner
116+
return owner
117+
118+
def append_snapshot(self, records: Sequence[Record], *, run_start_time: dt.datetime) -> None:
119+
snapshot_id = uuid.uuid4()
120+
historical_records = [
121+
self._inventory_record_to_historical(record, snapshot_id=snapshot_id, run_start_time=run_start_time)
122+
for record in records
123+
]
124+
self._persist(self._object_type, historical_records)
125+
126+
def _inventory_record_to_historical(
127+
self, record: Record, *, snapshot_id: uuid.UUID, run_start_time: dt.datetime
128+
) -> HistoricalRecord:
129+
object_id = self._key_from(record)
130+
object_as_dict = dataclasses.asdict(record)
131+
object_as_json = json.dumps(object_as_dict)
132+
# TODO: Get failures.
133+
failures: list[str] = []
134+
return HistoricalRecord(
135+
workspace_id=self._workspace_id,
136+
run_id=self._run_id,
137+
snapshot_id=str(snapshot_id),
138+
run_start_time=run_start_time,
139+
object_type=self._object_type,
140+
object_type_version=self._object_type_version,
141+
object_id=object_id,
142+
object_data=object_as_json,
143+
failures=failures,
144+
owner=self._owner,
145+
)
146+
147+
def appender(self, klass: type[Record]) -> Appender:
148+
# TODO: Make a part of the protocol so the type-checker can enforce this.
149+
key_from = getattr(klass, "key_fields")
150+
return self.Appender(self._ws, self._run_id, klass, key_from, self._append_history_snapshot)

src/databricks/labs/ucx/source_code/directfs_access.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import dataclasses
4+
import datetime as dt
45
import logging
56
import sys
67
from collections.abc import Sequence, Iterable
@@ -97,14 +98,14 @@ def __init__(self, backend: SqlBackend, schema: str, table: str):
9798
"""
9899
super().__init__(backend=backend, catalog="hive_metastore", schema=schema, table=table, klass=DirectFsAccess)
99100

100-
def dump_all(self, dfsas: Sequence[DirectFsAccess]):
101+
def dump_all(self, dfsas: Sequence[DirectFsAccess], crawl_start_time: dt.datetime):
101102
"""This crawler doesn't follow the pull model because the fetcher fetches data for 2 crawlers, not just one
102103
It's not **bad** because all records are pushed at once.
103104
Providing a multi-entity crawler is out-of-scope of this PR
104105
"""
105106
try:
106107
# TODO until we historize data, we append all DFSAs
107-
self._update_snapshot(dfsas, mode="append")
108+
self._update_snapshot(dfsas, crawl_start_time=crawl_start_time, mode="append")
108109
except DatabricksError as e:
109110
logger.error("Failed to store DFSAs", exc_info=e)
110111

src/databricks/labs/ucx/source_code/jobs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ def __init__(
352352
self._include_job_ids = include_job_ids
353353

354354
def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
355+
crawl_start_time = datetime.now(tz=timezone.utc)
355356
tasks = []
356357
all_jobs = list(self._ws.jobs.list())
357358
logger.info(f"Preparing {len(all_jobs)} linting tasks...")
@@ -374,7 +375,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
374375
JobProblem,
375376
mode='overwrite',
376377
)
377-
self._directfs_crawler.dump_all(job_dfsas)
378+
self._directfs_crawler.dump_all(job_dfsas, crawl_start_time=crawl_start_time)
378379
if len(errors) > 0:
379380
raise ManyError(errors)
380381

src/databricks/labs/ucx/source_code/queries.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
8484
)
8585
for dfsa in all_dfsas
8686
]
87-
self._directfs_crawler.dump_all(all_dfsas)
87+
self._directfs_crawler.dump_all(all_dfsas, crawl_start_time=assessment_start)
8888

8989
def _dashboard_ids_in_scope(self) -> list[str]:
9090
if self._include_dashboard_ids is not None: # an empty list is accepted

tests/unit/source_code/test_directfs_access.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import datetime
1+
import datetime as dt
22

33
from databricks.labs.lsql.backends import MockBackend
44

@@ -12,19 +12,20 @@
1212
def test_crawler_appends_dfsas():
1313
backend = MockBackend()
1414
crawler = DirectFsAccessCrawler.for_paths(backend, "schema")
15+
now = dt.datetime.now(tz=dt.timezone.utc)
1516
dfsas = list(
1617
DirectFsAccess(
1718
path=path,
1819
is_read=False,
1920
is_write=False,
2021
source_id="ID",
21-
source_timestamp=datetime.now(),
22+
source_timestamp=now,
2223
source_lineage=[LineageAtom(object_type="LINEAGE", object_id="ID")],
23-
assessment_start_timestamp=datetime.now(),
24-
assessment_end_timestamp=datetime.now(),
24+
assessment_start_timestamp=now,
25+
assessment_end_timestamp=now,
2526
)
2627
for path in ("a", "b", "c")
2728
)
28-
crawler.dump_all(dfsas)
29+
crawler.dump_all(dfsas, now)
2930
rows = backend.rows_written_for(crawler.full_name, "append")
3031
assert len(rows) == 3

0 commit comments

Comments
 (0)