-
Notifications
You must be signed in to change notification settings - Fork 96
Crawlers: append snapshots to history journal, if available #2743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
15dd48d
625f16a
622792e
3d31a1f
4ebf9b6
d82e06b
4d676bb
cf48771
e877532
b321903
02fc40f
6170a63
4b55625
155cda7
260becf
0efb3e5
6e649af
a2dd1d5
f897d98
4a5321c
76aeefb
e321589
4e3e3c8
616b0f1
a52720d
6f38192
af1fff2
724abb2
dd73486
31740dd
00d5d85
57f63ba
3aa2431
4c86ba6
5c27d8b
39f1105
1d50ce1
d431322
95d2a48
4ce4ce4
7441077
07bc711
3b3e5bd
a9dd77f
0f9a4cd
ad482b8
fd2b3ac
9d4ed60
3703848
62390b5
c1fea83
8c70541
6f41fc0
927392a
78f592d
6dfed4c
34b1e72
77a06e1
ac60675
f00a06e
4d6989c
17100d5
78a9a13
21ad44d
febc9ce
383027d
e223612
c938dd8
8a378dc
2e5869d
bf542a4
3aad8bf
a990327
2381d18
23e7720
567d809
f0bd963
5b2ab77
0ff2e95
971cb4c
be16738
2b9b476
f17dc74
823c886
7377727
fdb03b2
b7af858
9b489b9
0a2f4f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
from __future__ import annotations | ||
import datetime as dt | ||
import logging | ||
from abc import ABC, abstractmethod | ||
from collections.abc import Callable, Iterable, Sequence | ||
|
@@ -6,6 +8,7 @@ | |
from databricks.labs.lsql.backends import SqlBackend | ||
from databricks.sdk.errors import NotFound | ||
|
||
from databricks.labs.ucx.framework.history import HistoryLog | ||
from databricks.labs.ucx.framework.utils import escape_sql_identifier | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -21,13 +24,22 @@ class DataclassInstance(Protocol): | |
|
||
|
||
class CrawlerBase(ABC, Generic[Result]): | ||
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]): | ||
def __init__( | ||
self, | ||
backend: SqlBackend, | ||
catalog: str, | ||
schema: str, | ||
table: str, | ||
klass: type[Result], | ||
history_log: HistoryLog | None = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If we look at things high level, the dependency diagram looks like flowchart TD
history -->|optional!!!| crawler
crawler --> history_appender
history --> history_appender
crawler --> snapshot
history_appender --> append_snapshot
append_snapshot --> snapshot
and it causes some suboptimal assumptions related to This can be avoided if dependencies are built in a more explicit way:
and then flowchart TD
history --> update_snapshot
crawler --> update_snapshot
update_snapshot -.-> _appender
crawler -.-> _appender
history -.-> _appender
crawler -.-> snapshot
snapshot -.-> _appender
this will fit better with things like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation has been replaced to fit with this style: instead of being a part of the crawlers, the history mechanism will operate as a wrapper around a select few and controlled explicitly by the relevant workflow tasks. |
||
): | ||
""" | ||
Initializes a CrawlerBase instance. | ||
|
||
Args: | ||
backend (SqlBackend): The backend that executes SQL queries: | ||
Statement Execution API or Databricks Runtime. | ||
history_log: The (optional) history log where (new) snapshots should be saved. | ||
catalog (str): The catalog name for the inventory persistence. | ||
schema: The schema name for the inventory persistence. | ||
table: The table name for the inventory persistence. | ||
|
@@ -36,6 +48,7 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k | |
self._schema = self._valid(schema) | ||
self._table = self._valid(table) | ||
self._backend = backend | ||
self._history_appender = history_log.appender(klass) if history_log is not None else None | ||
self._fetch = backend.fetch | ||
self._exec = backend.execute | ||
self._klass = klass | ||
|
@@ -155,10 +168,15 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool) | |
except NotFound as e: | ||
logger.debug("Inventory table not found", exc_info=e) | ||
logger.debug(f"[{self.full_name}] crawling new set of snapshot data for {self._table}") | ||
crawl_start_time = dt.datetime.now(tz=dt.timezone.utc) | ||
loaded_records = list(loader()) | ||
self._update_snapshot(loaded_records, mode="overwrite") | ||
self._update_snapshot(loaded_records, crawl_start_time=crawl_start_time, mode="overwrite") | ||
return loaded_records | ||
|
||
def _update_snapshot(self, items: Sequence[Result], mode: Literal["append", "overwrite"] = "append") -> None: | ||
def _update_snapshot( | ||
self, items: Sequence[Result], *, crawl_start_time: dt.datetime, mode: Literal["append", "overwrite"] | ||
) -> None: | ||
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}") | ||
self._backend.save_table(self.full_name, items, self._klass, mode=mode) | ||
if self._history_appender: | ||
self._history_appender.append_snapshot(items, run_start_time=crawl_start_time) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,158 @@ | ||||||
from __future__ import annotations | ||||||
import dataclasses | ||||||
import datetime as dt | ||||||
import json | ||||||
import logging | ||||||
import os | ||||||
from collections.abc import Callable, Sequence | ||||||
from dataclasses import dataclass | ||||||
from functools import cached_property | ||||||
from typing import ClassVar, Protocol, TypeVar | ||||||
|
||||||
from databricks.labs.lsql.backends import SqlBackend | ||||||
from databricks.sdk import WorkspaceClient | ||||||
|
||||||
|
||||||
logger = logging.getLogger(__name__) | ||||||
|
||||||
|
||||||
@dataclass(frozen=True, kw_only=True) | ||||||
class HistoricalRecord: | ||||||
workspace_id: int | ||||||
"""The identifier of the workspace where this record was generated.""" | ||||||
|
||||||
run_id: int | ||||||
"""The identifier of the workflow run that generated this record.""" | ||||||
|
||||||
snapshot_id: int | ||||||
"""An identifier that is unique to the records produced for a given snapshot.""" | ||||||
|
||||||
run_start_time: dt.datetime | ||||||
"""When this record was generated.""" | ||||||
|
||||||
object_type: str | ||||||
"""The inventory table for which this record was generated.""" | ||||||
|
||||||
object_type_version: int | ||||||
"""Versioning of inventory table, for forward compatibility.""" | ||||||
|
||||||
object_id: list[str] | ||||||
"""The type-specific identifier for this inventory record.""" | ||||||
|
||||||
object_data: dict[str, str] | ||||||
"""Type-specific data of the inventory record. Keys are top-level attributes, values are their JSON-encoded values.""" | ||||||
|
||||||
failures: list[str] | ||||||
"""The list of problems associated with the object that this inventory record covers.""" | ||||||
|
||||||
owner: str | ||||||
"""The identity of the account that created this inventory record.""" | ||||||
|
||||||
|
||||||
class DataclassInstance(Protocol): | ||||||
__dataclass_fields__: ClassVar[dict] | ||||||
# TODO: Once all record types provide the property: key_fields: ClassVar[Sequence[str]] | ||||||
|
||||||
|
||||||
Record = TypeVar("Record", bound=DataclassInstance) | ||||||
|
||||||
|
||||||
class HistoryLog: | ||||||
__slots__ = ("_ws", "_backend", "_run_id", "_catalog", "_schema", "_table") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be the first UCX class that have it, why introduce it here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not the first, but I agree it's rare. This is just a way of ensuring that attribute mistakes trigger errors instead of relying on ruff/mypy/pylint to flag them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oke, I guess it is good practice, but also kinda "tech debt" for other classes. Could you create a tech debt issue for this? We might not do this, but let's track it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't really do this in other places and it doesn't really add to readability. it's not quite consistent with the rest of the codebase. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The revised implementation does not use |
||||||
|
||||||
def __init__( | ||||||
self, | ||||||
ws: WorkspaceClient, | ||||||
backend: SqlBackend, | ||||||
run_id: int, | ||||||
catalog: str, | ||||||
schema: str, | ||||||
table: str, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
) -> None: | ||||||
self._ws = ws | ||||||
self._backend = backend | ||||||
self._run_id = run_id | ||||||
self._catalog = catalog | ||||||
self._schema = schema | ||||||
self._table = table | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
@property | ||||||
def full_name(self) -> str: | ||||||
return f"{self._catalog}.{self._schema}.{self._table}" | ||||||
|
||||||
def _append_history_snapshot(self, object_type: str, snapshot: list[HistoricalRecord]) -> None: | ||||||
logger.debug(f"[{self.full_name}] appending {len(snapshot)} new records for {object_type}") | ||||||
# Concurrent writes do not need to be handled here; appends cannot conflict. | ||||||
# TODO: Although documented as conflict-free, verify that this is truly is the case. | ||||||
self._backend.save_table(self.full_name, snapshot, HistoricalRecord, mode="append") | ||||||
|
||||||
class Appender: | ||||||
__slots__ = ("_ws", "_object_type", "_object_type_version", "_key_from", "_run_id", "_persist") | ||||||
|
||||||
def __init__( | ||||||
self, | ||||||
ws: WorkspaceClient, | ||||||
run_id: int, | ||||||
klass: type[Record], | ||||||
key_from: Callable[[Record], list[str]], | ||||||
persist: Callable[[str, list[HistoricalRecord]], None], | ||||||
) -> None: | ||||||
self._ws = ws | ||||||
self._run_id = run_id | ||||||
self._object_type = klass.__name__ | ||||||
# Versioning support: if the dataclass has a _ucx_version class attribute that is the current version. | ||||||
self._object_type_version = getattr(klass, "_ucx_version") if hasattr(klass, "_ucx_version") else 0 | ||||||
self._key_from = key_from | ||||||
self._persist = persist | ||||||
|
||||||
@cached_property | ||||||
def _workspace_id(self) -> int: | ||||||
return self._ws.get_workspace_id() | ||||||
|
||||||
@cached_property | ||||||
def _owner(self) -> str: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not an object owner. it's defined differently for every object type. probably there'll need to be a separate PR to fix that and make it consistent. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We've created #2761 to track this. That PR blocks progress on this one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the new implementation ( |
||||||
current_user = self._ws.current_user.me() | ||||||
owner = current_user.user_name or current_user.id | ||||||
assert owner | ||||||
return owner | ||||||
|
||||||
def append_snapshot(self, records: Sequence[Record], *, run_start_time: dt.datetime) -> None: | ||||||
snapshot_id = int.from_bytes(os.urandom(7), byteorder="big") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
rely on workflow run id |
||||||
historical_records = [ | ||||||
self._inventory_record_to_historical(record, snapshot_id=snapshot_id, run_start_time=run_start_time) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @JCZuurmond in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
for record in records | ||||||
asnare marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
] | ||||||
self._persist(self._object_type, historical_records) | ||||||
|
||||||
def _inventory_record_to_historical( | ||||||
self, | ||||||
record: Record, | ||||||
*, | ||||||
snapshot_id: int, | ||||||
run_start_time: dt.datetime, | ||||||
) -> HistoricalRecord: | ||||||
object_id = self._key_from(record) | ||||||
object_as_dict = dataclasses.asdict(record) | ||||||
flattened_object_data = {k: json.dumps(v) for k, v in object_as_dict.items()} | ||||||
# TODO: Get failures. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and parse them out of JSON remove them from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will work for the classes that have failures. (Not all do: 5 of the 9 currently being updated during Aside from these, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation is superseded by a new one. Encoding is handled by |
||||||
failures: list[str] = [] | ||||||
return HistoricalRecord( | ||||||
workspace_id=self._workspace_id, | ||||||
run_id=self._run_id, | ||||||
snapshot_id=snapshot_id, | ||||||
run_start_time=run_start_time, | ||||||
object_type=self._object_type, | ||||||
object_type_version=self._object_type_version, | ||||||
object_id=object_id, | ||||||
object_data=flattened_object_data, | ||||||
failures=failures, | ||||||
owner=self._owner, | ||||||
) | ||||||
|
||||||
def appender(self, klass: type[Record]) -> Appender: | ||||||
key_fields = getattr(klass, "key_fields", ()) | ||||||
|
||||||
def key_from(record: Record) -> list[str]: | ||||||
return [getattr(record, field) for field in key_fields] | ||||||
|
||||||
return self.Appender(self._ws, self._run_id, klass, key_from, self._append_history_snapshot) |
Uh oh!
There was an error while loading. Please reload this page.