Skip to content

Commit 47c5f50

Browse files
committed
Introduce an optional history log, where crawler snapshots are journalled.
Work in progress.
1 parent b16098c commit 47c5f50

File tree

5 files changed

+34
-12
lines changed

5 files changed

+34
-12
lines changed

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
import datetime as dt
13
import logging
24
from abc import ABC, abstractmethod
35
from collections.abc import Callable, Iterable, Sequence
@@ -6,6 +8,7 @@
68
from databricks.labs.lsql.backends import SqlBackend
79
from databricks.sdk.errors import NotFound
810

11+
from databricks.labs.ucx.framework.history import HistoryLog
912
from databricks.labs.ucx.framework.utils import escape_sql_identifier
1013

1114
logger = logging.getLogger(__name__)
@@ -21,13 +24,22 @@ class DataclassInstance(Protocol):
2124

2225

2326
class CrawlerBase(ABC, Generic[Result]):
24-
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]):
27+
def __init__(
28+
self,
29+
backend: SqlBackend,
30+
catalog: str,
31+
schema: str,
32+
table: str,
33+
klass: type[Result],
34+
history_log: HistoryLog | None = None,
35+
):
2536
"""
2637
Initializes a CrawlerBase instance.
2738
2839
Args:
2940
backend (SqlBackend): The backend that executes SQL queries:
3041
Statement Execution API or Databricks Runtime.
42+
history_log: The (optional) history log where (new) snapshots should be saved.
3143
catalog (str): The catalog name for the inventory persistence.
3244
schema: The schema name for the inventory persistence.
3345
table: The table name for the inventory persistence.
@@ -36,6 +48,7 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k
3648
self._schema = self._valid(schema)
3749
self._table = self._valid(table)
3850
self._backend = backend
51+
self._history_log = history_log
3952
self._fetch = backend.fetch
4053
self._exec = backend.execute
4154
self._klass = klass
@@ -155,10 +168,16 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool)
155168
except NotFound as e:
156169
logger.debug("Inventory table not found", exc_info=e)
157170
logger.debug(f"[{self.full_name}] crawling new set of snapshot data for {self._table}")
171+
crawl_start_time = dt.datetime.now(tz=dt.timezone.utc)
158172
loaded_records = list(loader())
159-
self._update_snapshot(loaded_records, mode="overwrite")
173+
self._update_snapshot(loaded_records, crawl_start_time=crawl_start_time, mode="overwrite")
160174
return loaded_records
161175

162-
def _update_snapshot(self, items: Sequence[Result], mode: Literal["append", "overwrite"] = "append") -> None:
176+
def _update_snapshot(
177+
self, items: Sequence[Result], *, crawl_start_time: dt.datetime, mode: Literal["append", "overwrite"]
178+
) -> None:
163179
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}")
164180
self._backend.save_table(self.full_name, items, self._klass, mode=mode)
181+
if self._history_log:
182+
appender = self._history_log.appender(self._klass)
183+
appender.append_snapshot(items, run_start_time=crawl_start_time)

src/databricks/labs/ucx/source_code/directfs_access.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import dataclasses
4+
import datetime as dt
45
import logging
56
import sys
67
from collections.abc import Sequence, Iterable
@@ -97,14 +98,14 @@ def __init__(self, backend: SqlBackend, schema: str, table: str):
9798
"""
9899
super().__init__(backend=backend, catalog="hive_metastore", schema=schema, table=table, klass=DirectFsAccess)
99100

100-
def dump_all(self, dfsas: Sequence[DirectFsAccess]):
101+
def dump_all(self, dfsas: Sequence[DirectFsAccess], crawl_start_time: dt.datetime):
101102
"""This crawler doesn't follow the pull model because the fetcher fetches data for 2 crawlers, not just one
102103
It's not **bad** because all records are pushed at once.
103104
Providing a multi-entity crawler is out-of-scope of this PR
104105
"""
105106
try:
106107
# TODO until we historize data, we append all DFSAs
107-
self._update_snapshot(dfsas, mode="append")
108+
self._update_snapshot(dfsas, crawl_start_time=crawl_start_time, mode="append")
108109
except DatabricksError as e:
109110
logger.error("Failed to store DFSAs", exc_info=e)
110111

src/databricks/labs/ucx/source_code/jobs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ def __init__(
352352
self._include_job_ids = include_job_ids
353353

354354
def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
355+
crawl_start_time = datetime.now(tz=timezone.utc)
355356
tasks = []
356357
all_jobs = list(self._ws.jobs.list())
357358
logger.info(f"Preparing {len(all_jobs)} linting tasks...")
@@ -374,7 +375,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
374375
JobProblem,
375376
mode='overwrite',
376377
)
377-
self._directfs_crawler.dump_all(job_dfsas)
378+
self._directfs_crawler.dump_all(job_dfsas, crawl_start_time=crawl_start_time)
378379
if len(errors) > 0:
379380
raise ManyError(errors)
380381

src/databricks/labs/ucx/source_code/queries.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
8484
)
8585
for dfsa in all_dfsas
8686
]
87-
self._directfs_crawler.dump_all(all_dfsas)
87+
self._directfs_crawler.dump_all(all_dfsas, crawl_start_time=assessment_start)
8888

8989
def _dashboard_ids_in_scope(self) -> list[str]:
9090
if self._include_dashboard_ids is not None: # an empty list is accepted

tests/unit/source_code/test_directfs_access.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import datetime
1+
import datetime as dt
22

33
from databricks.labs.lsql.backends import MockBackend
44

@@ -12,19 +12,20 @@
1212
def test_crawler_appends_dfsas():
1313
backend = MockBackend()
1414
crawler = DirectFsAccessCrawler.for_paths(backend, "schema")
15+
now = dt.datetime.now(tz=dt.timezone.utc)
1516
dfsas = list(
1617
DirectFsAccess(
1718
path=path,
1819
is_read=False,
1920
is_write=False,
2021
source_id="ID",
21-
source_timestamp=datetime.now(),
22+
source_timestamp=now,
2223
source_lineage=[LineageAtom(object_type="LINEAGE", object_id="ID")],
23-
assessment_start_timestamp=datetime.now(),
24-
assessment_end_timestamp=datetime.now(),
24+
assessment_start_timestamp=now,
25+
assessment_end_timestamp=now,
2526
)
2627
for path in ("a", "b", "c")
2728
)
28-
crawler.dump_all(dfsas)
29+
crawler.dump_all(dfsas, now)
2930
rows = backend.rows_written_for(crawler.full_name, "append")
3031
assert len(rows) == 3

0 commit comments

Comments
 (0)