Skip to content

Crawlers: append snapshots to history journal, if available #2743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 89 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
15dd48d
Introduce an optional history log, where crawler snapshots are journa…
asnare Sep 25, 2024
625f16a
Merge branch 'main' into crawler-snapshot-history
asnare Sep 25, 2024
622792e
Switch to integer identifiers for run_id and snapshot_id.
asnare Sep 25, 2024
3d31a1f
Merge branch 'main' into crawler-snapshot-history
asnare Sep 25, 2024
4ebf9b6
Update to store object data with first-level attributes exposed as a …
asnare Sep 25, 2024
d82e06b
Use a 56-bit random number for the snapshot_id.
asnare Sep 25, 2024
4d676bb
Switch to composite object identifier.
asnare Sep 25, 2024
cf48771
Formatting.
asnare Sep 25, 2024
e877532
Modify TODO to not trigger linter.
asnare Sep 25, 2024
b321903
Unit tests for crawler appending new snapshots to the history.
asnare Sep 25, 2024
02fc40f
Merge branch 'main' into crawler-snapshot-history
asnare Sep 30, 2024
6170a63
Ensure call-by-keyword, and indicate the return type.
asnare Sep 30, 2024
4b55625
Fix unit test.
asnare Sep 30, 2024
155cda7
Merge branch 'main' into crawler-snapshot-history
asnare Oct 9, 2024
260becf
Back out changes to the crawler that relate to the history.
asnare Oct 9, 2024
0efb3e5
Merge branch 'main' into crawler-snapshot-history
asnare Oct 9, 2024
6e649af
Merge branch 'main' into crawler-snapshot-history
asnare Oct 9, 2024
a2dd1d5
Merge branch 'main' into crawler-snapshot-history
asnare Oct 14, 2024
f897d98
Merge branch 'main' into crawler-snapshot-history
asnare Oct 15, 2024
4a5321c
Replace initial history record conversion and logging with a new vers…
asnare Oct 16, 2024
76aeefb
Merge branch 'main' into crawler-snapshot-history
asnare Oct 16, 2024
e321589
Mark inner dataclasses in tests as immutable, so they are safe to use…
asnare Oct 16, 2024
4e3e3c8
Fix type hint.
asnare Oct 16, 2024
616b0f1
Mark test class as immutable.
asnare Oct 16, 2024
a52720d
Unit tests for the failures[] mechanism.
asnare Oct 16, 2024
6f38192
When encoding a string field, also handle it being optional.
asnare Oct 16, 2024
af1fff2
Fix comparison.
asnare Oct 16, 2024
724abb2
Unit tests for the history log.
asnare Oct 16, 2024
dd73486
Remove dead code.
asnare Oct 16, 2024
31740dd
Merge branch 'main' into crawler-snapshot-history
asnare Oct 16, 2024
00d5d85
Type hint.
asnare Oct 16, 2024
57f63ba
Test detection of naive timestamps during encoding.
asnare Oct 16, 2024
3aa2431
Rename test argument to avoid shadowing a global.
asnare Oct 16, 2024
4c86ba6
Unit test for handling unserializable values.
asnare Oct 16, 2024
5c27d8b
Merge branch 'main' into crawler-snapshot-history
asnare Oct 17, 2024
39f1105
Inline trivial method.
asnare Oct 17, 2024
1d50ce1
Update error message on unserializable value to provide more context.
asnare Oct 17, 2024
d431322
Merge branch 'main' into crawler-snapshot-history
asnare Oct 17, 2024
95d2a48
Merge branch 'main' into crawler-snapshot-history
asnare Oct 17, 2024
4ce4ce4
Rename for consistency.
asnare Oct 17, 2024
7441077
Update HistoryLog initializer: it doesn't need a workspace client.
asnare Oct 18, 2024
07bc711
Update to object_id support to allow properties as well as fields.
asnare Oct 18, 2024
3b3e5bd
Tweak type signature: the snapshot to append can be any iterable type.
asnare Oct 18, 2024
a9dd77f
Unit tests for logging Table records into the history log.
asnare Oct 18, 2024
0f9a4cd
Update Grants to support logging into the history log.
asnare Oct 18, 2024
ad482b8
Update the migration progress workflow to log tables and grants to th…
asnare Oct 18, 2024
fd2b3ac
Unit tests for a migration-progress task that wasn't covered yet.
asnare Oct 18, 2024
9d4ed60
Merge branch 'main' into crawler-snapshot-history
asnare Oct 18, 2024
3703848
Support classes whose failures attribute is a string (containing JSON…
asnare Oct 18, 2024
62390b5
Ensure updated UDF snapshots are logged to the history table.
asnare Oct 18, 2024
c1fea83
Naming consistency.
asnare Oct 18, 2024
8c70541
Fix copypasta.
asnare Oct 18, 2024
6f41fc0
Ensure updated JobInfo snapshots are appended to the history log.
asnare Oct 18, 2024
927392a
Fix type hints.
asnare Oct 18, 2024
78f592d
Ensure updated ClusterInfo snapshots are stored in the history table.
asnare Oct 18, 2024
6dfed4c
Fix some test names.
asnare Oct 18, 2024
34b1e72
Ensure updated PipelineInfo snapshots are appended to the history table.
asnare Oct 18, 2024
77a06e1
Ensure updated Cluster Policy snapshots are logged to the history table.
asnare Oct 18, 2024
ac60675
Formatting.
asnare Oct 18, 2024
f00a06e
Fix docstring.
asnare Oct 18, 2024
4d6989c
Ensure that updated TableMigrationStatus snapshots are appended to th…
asnare Oct 18, 2024
17100d5
Update query to return at most a single record.
asnare Oct 21, 2024
78a9a13
Update integration test to verify that the history log is written to.
asnare Oct 21, 2024
21ad44d
Update history log to write to multiworkspace.historical instead of u…
asnare Oct 21, 2024
febc9ce
Ensure all tasks run on a UC-enabled cluster.
asnare Oct 21, 2024
383027d
Merge branch 'main' into crawler-snapshot-history
asnare Oct 21, 2024
e223612
Formatting.
asnare Oct 21, 2024
c938dd8
Split the crawling and history-log update across 2 tasks for the tabl…
asnare Oct 21, 2024
8a378dc
Note a limitation of the fast-scan table crawler.
asnare Oct 21, 2024
2e5869d
Factor out the ownership components so they can be used elsewhere.
asnare Oct 21, 2024
bf542a4
Mark the __id_attributes__ sequence as immutable.
asnare Oct 21, 2024
3aad8bf
Mark linters as still needing to be done.
asnare Oct 21, 2024
a990327
Merge branch 'main' into crawler-snapshot-history
asnare Oct 21, 2024
2381d18
Handle UDF failures, which aren't JSON-encoded as with other classes.
asnare Oct 21, 2024
23e7720
Sort imports.
asnare Oct 22, 2024
567d809
Test case (and fix) for when __id_attributes__ is annotated as None.
asnare Oct 22, 2024
f0bd963
Docstring explaining HistoricalEncoder design and intent.
asnare Oct 22, 2024
5b2ab77
Rename some things to align more closely with what they are.
asnare Oct 22, 2024
0ff2e95
Remove redundant type alternative.
asnare Oct 22, 2024
971cb4c
Docstring wording and formatting updates.
asnare Oct 22, 2024
be16738
Mention use-case for these records.
asnare Oct 22, 2024
2b9b476
Clarify reason for assumption.
asnare Oct 22, 2024
f17dc74
Document reason for non-default JSON separators.
asnare Oct 22, 2024
823c886
Detect and handle non-string values being passed in string-hinted fie…
asnare Oct 22, 2024
7377727
Merge branch 'main' into crawler-snapshot-history
asnare Oct 22, 2024
fdb03b2
Handle the remaining lsql-supported fields types.
asnare Oct 22, 2024
b7af858
All tasks in the workflow are supposed to depend on the assessment ha…
asnare Oct 22, 2024
9b489b9
Explicitly declare the dependency of the record_workflow_run on the t…
asnare Oct 23, 2024
0a2f4f2
Use correct method for converting rows to dictionaries.
asnare Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions src/databricks/labs/ucx/framework/crawlers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations
import datetime as dt
import logging
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Sequence
Expand All @@ -6,6 +8,7 @@
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk.errors import NotFound

from databricks.labs.ucx.framework.history import HistoryLog
from databricks.labs.ucx.framework.utils import escape_sql_identifier

logger = logging.getLogger(__name__)
Expand All @@ -21,13 +24,22 @@ class DataclassInstance(Protocol):


class CrawlerBase(ABC, Generic[Result]):
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]):
def __init__(
self,
backend: SqlBackend,
catalog: str,
schema: str,
table: str,
klass: type[Result],
history_log: HistoryLog | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... | None constructor-time dependency in a singleton is an anti-pattern and it has led to undefined behaviors before.

If we look at things high level, the dependency diagram looks like

flowchart TD
    history -->|optional!!!| crawler
    crawler --> history_appender
    history --> history_appender
    crawler --> snapshot
    history_appender --> append_snapshot
    append_snapshot --> snapshot
Loading

and it causes some suboptimal assumptions related to migration_status table expressed in #2600 (comment).

This can be avoided if dependencies are built in a more explicit way:

    @job_task
    def crawl_tables(self, ctx: RuntimeContext) -> None:
        ctx.history_log.update_snapshot(ctx.tables_crawler)

    @job_task
    def crawl_udfs(self, ctx: RuntimeContext) -> None:
       ctx.history_log.update_snapshot(ctx.udfs_crawler)

and then update_snapshot being something like with clear and non-optional dependencies

flowchart TD
    history --> update_snapshot
    crawler --> update_snapshot

    update_snapshot -.-> _appender
    crawler -.-> _appender
    history -.-> _appender
    crawler -.-> snapshot
    snapshot -.-> _appender

Loading
def update_snapshot(self, crawler: CrawlerBase[T]):
    appender = self._create_appender(crawler.get_klass())
    appender.persist(crawler.snapshot(force_refresh=True))

this will fit better with things like migration_status and dfsa_*

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation has been replaced to fit with this style: instead of being a part of the crawlers, the history mechanism will operate as a wrapper around a select few and controlled explicitly by the relevant workflow tasks.

):
"""
Initializes a CrawlerBase instance.

Args:
backend (SqlBackend): The backend that executes SQL queries:
Statement Execution API or Databricks Runtime.
history_log: The (optional) history log where (new) snapshots should be saved.
catalog (str): The catalog name for the inventory persistence.
schema: The schema name for the inventory persistence.
table: The table name for the inventory persistence.
Expand All @@ -36,6 +48,7 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k
self._schema = self._valid(schema)
self._table = self._valid(table)
self._backend = backend
self._history_appender = history_log.appender(klass) if history_log is not None else None
self._fetch = backend.fetch
self._exec = backend.execute
self._klass = klass
Expand Down Expand Up @@ -155,10 +168,15 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool)
except NotFound as e:
logger.debug("Inventory table not found", exc_info=e)
logger.debug(f"[{self.full_name}] crawling new set of snapshot data for {self._table}")
crawl_start_time = dt.datetime.now(tz=dt.timezone.utc)
loaded_records = list(loader())
self._update_snapshot(loaded_records, mode="overwrite")
self._update_snapshot(loaded_records, crawl_start_time=crawl_start_time, mode="overwrite")
return loaded_records

def _update_snapshot(self, items: Sequence[Result], mode: Literal["append", "overwrite"] = "append") -> None:
def _update_snapshot(
self, items: Sequence[Result], *, crawl_start_time: dt.datetime, mode: Literal["append", "overwrite"]
) -> None:
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}")
self._backend.save_table(self.full_name, items, self._klass, mode=mode)
if self._history_appender:
self._history_appender.append_snapshot(items, run_start_time=crawl_start_time)
158 changes: 158 additions & 0 deletions src/databricks/labs/ucx/framework/history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from __future__ import annotations
import dataclasses
import datetime as dt
import json
import logging
import os
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from functools import cached_property
from typing import ClassVar, Protocol, TypeVar

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient


logger = logging.getLogger(__name__)


@dataclass(frozen=True, kw_only=True)
class HistoricalRecord:
workspace_id: int
"""The identifier of the workspace where this record was generated."""

run_id: int
"""The identifier of the workflow run that generated this record."""

snapshot_id: int
"""An identifier that is unique to the records produced for a given snapshot."""

run_start_time: dt.datetime
"""When this record was generated."""

object_type: str
"""The inventory table for which this record was generated."""

object_type_version: int
"""Versioning of inventory table, for forward compatibility."""

object_id: list[str]
"""The type-specific identifier for this inventory record."""

object_data: dict[str, str]
"""Type-specific data of the inventory record. Keys are top-level attributes, values are their JSON-encoded values."""

failures: list[str]
"""The list of problems associated with the object that this inventory record covers."""

owner: str
"""The identity of the account that created this inventory record."""


class DataclassInstance(Protocol):
__dataclass_fields__: ClassVar[dict]
# TODO: Once all record types provide the property: key_fields: ClassVar[Sequence[str]]


Record = TypeVar("Record", bound=DataclassInstance)


class HistoryLog:
__slots__ = ("_ws", "_backend", "_run_id", "_catalog", "_schema", "_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be the first UCX class that have it, why introduce it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the first, but I agree it's rare. This is just a way of ensuring that attribute mistakes trigger errors instead of relying on ruff/mypy/pylint to flag them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oke, I guess it is good practice, but also kinda "tech debt" for other classes. Could you create a tech debt issue for this? We might not do this, but let's track it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really do this in other places and it doesn't really add to readability. it's not quite consistent with the rest of the codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The revised implementation does not use __slots__.


def __init__(
self,
ws: WorkspaceClient,
backend: SqlBackend,
run_id: int,
catalog: str,
schema: str,
table: str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
table: str,

) -> None:
self._ws = ws
self._backend = backend
self._run_id = run_id
self._catalog = catalog
self._schema = schema
self._table = table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._table = table
self._table = "history"


@property
def full_name(self) -> str:
return f"{self._catalog}.{self._schema}.{self._table}"

def _append_history_snapshot(self, object_type: str, snapshot: list[HistoricalRecord]) -> None:
logger.debug(f"[{self.full_name}] appending {len(snapshot)} new records for {object_type}")
# Concurrent writes do not need to be handled here; appends cannot conflict.
# TODO: Although documented as conflict-free, verify that this is truly is the case.
self._backend.save_table(self.full_name, snapshot, HistoricalRecord, mode="append")

class Appender:
__slots__ = ("_ws", "_object_type", "_object_type_version", "_key_from", "_run_id", "_persist")

def __init__(
self,
ws: WorkspaceClient,
run_id: int,
klass: type[Record],
key_from: Callable[[Record], list[str]],
persist: Callable[[str, list[HistoricalRecord]], None],
) -> None:
self._ws = ws
self._run_id = run_id
self._object_type = klass.__name__
# Versioning support: if the dataclass has a _ucx_version class attribute that is the current version.
self._object_type_version = getattr(klass, "_ucx_version") if hasattr(klass, "_ucx_version") else 0
self._key_from = key_from
self._persist = persist

@cached_property
def _workspace_id(self) -> int:
return self._ws.get_workspace_id()

@cached_property
def _owner(self) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not an object owner. it's defined differently for every object type. probably there'll need to be a separate PR to fix that and make it consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've created #2761 to track this. That PR blocks progress on this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new implementation (HistoricalEncoder) the owner property comes from the supplied Ownership implementation.

current_user = self._ws.current_user.me()
owner = current_user.user_name or current_user.id
assert owner
return owner

def append_snapshot(self, records: Sequence[Record], *, run_start_time: dt.datetime) -> None:
snapshot_id = int.from_bytes(os.urandom(7), byteorder="big")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
snapshot_id = int.from_bytes(os.urandom(7), byteorder="big")

rely on workflow run id

historical_records = [
self._inventory_record_to_historical(record, snapshot_id=snapshot_id, run_start_time=run_start_time)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JCZuurmond in the parse_logs task, create the WorkflowRuns entities, where you record run start time and run end time. otherwise the writer side gets too complicated. parse_logs is guaranteed to be executed at the end of every job run, so it's a good place to do so. or add one more task explicitly, which is a bit more explicit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for record in records
]
self._persist(self._object_type, historical_records)

def _inventory_record_to_historical(
self,
record: Record,
*,
snapshot_id: int,
run_start_time: dt.datetime,
) -> HistoricalRecord:
object_id = self._key_from(record)
object_as_dict = dataclasses.asdict(record)
flattened_object_data = {k: json.dumps(v) for k, v in object_as_dict.items()}
# TODO: Get failures.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and parse them out of JSON remove them from object_as_dict, because object_data has to be dict[str,str] for readers to be simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work for the classes that have failures. (Not all do: 5 of the 9 currently being updated during migration-progress have it, 4 do not.)

Aside from these, ReconResult has an error_message attribute instead of failures, but it fulfils the same purpose. This will need to be special-cased.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is superseded by a new one. Encoding is handled by HistoricalEncoder: it treats failures as a special property, and does not include it in the object data. If the class doesn't have a failures attribute we set to this an empty list ([]) because it's a mandatory field.

failures: list[str] = []
return HistoricalRecord(
workspace_id=self._workspace_id,
run_id=self._run_id,
snapshot_id=snapshot_id,
run_start_time=run_start_time,
object_type=self._object_type,
object_type_version=self._object_type_version,
object_id=object_id,
object_data=flattened_object_data,
failures=failures,
owner=self._owner,
)

def appender(self, klass: type[Record]) -> Appender:
key_fields = getattr(klass, "key_fields", ())

def key_from(record: Record) -> list[str]:
return [getattr(record, field) for field in key_fields]

return self.Appender(self._ws, self._run_id, klass, key_from, self._append_history_snapshot)
5 changes: 3 additions & 2 deletions src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import dataclasses
import datetime as dt
import logging
import sys
from collections.abc import Sequence, Iterable
Expand Down Expand Up @@ -97,14 +98,14 @@ def __init__(self, backend: SqlBackend, schema: str, table: str):
"""
super().__init__(backend=backend, catalog="hive_metastore", schema=schema, table=table, klass=DirectFsAccess)

def dump_all(self, dfsas: Sequence[DirectFsAccess]):
def dump_all(self, dfsas: Sequence[DirectFsAccess], *, crawl_start_time: dt.datetime) -> None:
"""This crawler doesn't follow the pull model because the fetcher fetches data for 2 crawlers, not just one
It's not **bad** because all records are pushed at once.
Providing a multi-entity crawler is out-of-scope of this PR
"""
try:
# TODO until we historize data, we append all DFSAs
self._update_snapshot(dfsas, mode="append")
self._update_snapshot(dfsas, crawl_start_time=crawl_start_time, mode="append")
except DatabricksError as e:
logger.error("Failed to store DFSAs", exc_info=e)

Expand Down
3 changes: 2 additions & 1 deletion src/databricks/labs/ucx/source_code/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def __init__(
self._include_job_ids = include_job_ids

def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
crawl_start_time = datetime.now(tz=timezone.utc)
tasks = []
all_jobs = list(self._ws.jobs.list())
logger.info(f"Preparing {len(all_jobs)} linting tasks...")
Expand All @@ -374,7 +375,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
JobProblem,
mode='overwrite',
)
self._directfs_crawler.dump_all(job_dfsas)
self._directfs_crawler.dump_all(job_dfsas, crawl_start_time=crawl_start_time)
if len(errors) > 0:
raise ManyError(errors)

Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/source_code/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
)
for dfsa in all_dfsas
]
self._directfs_crawler.dump_all(all_dfsas)
self._directfs_crawler.dump_all(all_dfsas, crawl_start_time=assessment_start)

def _dashboard_ids_in_scope(self) -> list[str]:
if self._include_dashboard_ids is not None: # an empty list is accepted
Expand Down
31 changes: 27 additions & 4 deletions tests/unit/framework/test_crawlers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from collections.abc import Iterable
from dataclasses import dataclass
from unittest.mock import Mock
from unittest.mock import ANY, Mock, create_autospec

import pytest
from databricks.labs.lsql import Row
from databricks.labs.lsql.backends import MockBackend
from databricks.sdk.errors import NotFound

from databricks.labs.ucx.framework.crawlers import CrawlerBase, Result, ResultFn
from databricks.labs.ucx.framework.history import HistoryLog


@dataclass
Expand Down Expand Up @@ -37,11 +38,12 @@ def __init__(
schema: str,
table: str,
klass: type[Result],
history_log: HistoryLog | None = None,
*,
fetcher: ResultFn = lambda: [],
loader: ResultFn = lambda: [],
):
super().__init__(backend, catalog, schema, table, klass)
super().__init__(backend, catalog, schema, table, klass, history_log)
self._fetcher = fetcher
self._loader = loader

Expand All @@ -62,6 +64,14 @@ def test_full_name():
assert cb.full_name == "a.b.c"


def test_history_appender_setup() -> None:
"""Verify that initializing the crawler also initializes the appender."""
mock_history = create_autospec(HistoryLog)
_ = _CrawlerFixture(MockBackend(), "a", "b", "c", Baz, history_log=mock_history)

mock_history.appender.assert_called_once_with(Baz)


def test_snapshot_crawls_when_no_prior_crawl() -> None:
"""Check that the crawler is invoked when the fetcher reports that the inventory doesn't exist."""
mock_backend = MockBackend()
Expand Down Expand Up @@ -132,7 +142,7 @@ def test_snapshot_force_refresh_replaces_prior_data() -> None:
assert [Row(first="second", second=None)] == mock_backend.rows_written_for("a.b.c", mode="overwrite")


def test_snapshot_updates_existing_table() -> None:
def test_snapshot_updates_existing_inventory_table() -> None:
mock_backend = MockBackend()
cb = _CrawlerFixture[Baz](mock_backend, "a", "b", "c", Baz, loader=lambda: [Baz(first="first")])

Expand All @@ -142,7 +152,7 @@ def test_snapshot_updates_existing_table() -> None:
assert [Row(first="first", second=None)] == mock_backend.rows_written_for("a.b.c", "overwrite")


def test_snapshot_updates_new_table() -> None:
def test_snapshot_updates_new_inventory_table() -> None:
mock_backend = MockBackend()

def fetcher():
Expand All @@ -159,6 +169,19 @@ def fetcher():
assert [Row(first="first", second=True)] == mock_backend.rows_written_for("a.b.c", "overwrite")


def test_snapshot_appends_to_history() -> None:
"""Verify that when a snapshot is saved, it's also persisted in the history."""
mock_history = create_autospec(HistoryLog)
cb = _CrawlerFixture[Baz](
MockBackend(), "a", "b", "c", Baz, loader=lambda: [Baz(first="first")], history_log=mock_history
)

result = cb.snapshot()

assert [Baz(first="first")] == result
mock_history.appender(Baz).append_snapshot.assert_called_once_with([Baz(first="first")], run_start_time=ANY)


def test_snapshot_wrong_error() -> None:
sql_backend = MockBackend()

Expand Down
11 changes: 6 additions & 5 deletions tests/unit/source_code/test_directfs_access.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
import datetime as dt

from databricks.labs.lsql.backends import MockBackend

Expand All @@ -12,19 +12,20 @@
def test_crawler_appends_dfsas():
backend = MockBackend()
crawler = DirectFsAccessCrawler.for_paths(backend, "schema")
now = dt.datetime.now(tz=dt.timezone.utc)
dfsas = list(
DirectFsAccess(
path=path,
is_read=False,
is_write=False,
source_id="ID",
source_timestamp=datetime.now(),
source_timestamp=now,
source_lineage=[LineageAtom(object_type="LINEAGE", object_id="ID")],
assessment_start_timestamp=datetime.now(),
assessment_end_timestamp=datetime.now(),
assessment_start_timestamp=now,
assessment_end_timestamp=now,
)
for path in ("a", "b", "c")
)
crawler.dump_all(dfsas)
crawler.dump_all(dfsas, crawl_start_time=now)
rows = backend.rows_written_for(crawler.full_name, "append")
assert len(rows) == 3