Skip to content

Migration progress: include DFSA records in the history log #3039

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 101 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
15dd48d
Introduce an optional history log, where crawler snapshots are journa…
asnare Sep 25, 2024
625f16a
Merge branch 'main' into crawler-snapshot-history
asnare Sep 25, 2024
622792e
Switch to integer identifiers for run_id and snapshot_id.
asnare Sep 25, 2024
3d31a1f
Merge branch 'main' into crawler-snapshot-history
asnare Sep 25, 2024
4ebf9b6
Update to store object data with first-level attributes exposed as a …
asnare Sep 25, 2024
d82e06b
Use a 56-bit random number for the snapshot_id.
asnare Sep 25, 2024
4d676bb
Switch to composite object identifier.
asnare Sep 25, 2024
cf48771
Formatting.
asnare Sep 25, 2024
e877532
Modify TODO to not trigger linter.
asnare Sep 25, 2024
b321903
Unit tests for crawler appending new snapshots to the history.
asnare Sep 25, 2024
02fc40f
Merge branch 'main' into crawler-snapshot-history
asnare Sep 30, 2024
6170a63
Ensure call-by-keyword, and indicate the return type.
asnare Sep 30, 2024
4b55625
Fix unit test.
asnare Sep 30, 2024
155cda7
Merge branch 'main' into crawler-snapshot-history
asnare Oct 9, 2024
260becf
Back out changes to the crawler that relate to the history.
asnare Oct 9, 2024
0efb3e5
Merge branch 'main' into crawler-snapshot-history
asnare Oct 9, 2024
6e649af
Merge branch 'main' into crawler-snapshot-history
asnare Oct 9, 2024
a2dd1d5
Merge branch 'main' into crawler-snapshot-history
asnare Oct 14, 2024
f897d98
Merge branch 'main' into crawler-snapshot-history
asnare Oct 15, 2024
4a5321c
Replace initial history record conversion and logging with a new vers…
asnare Oct 16, 2024
76aeefb
Merge branch 'main' into crawler-snapshot-history
asnare Oct 16, 2024
e321589
Mark inner dataclasses in tests as immutable, so they are safe to use…
asnare Oct 16, 2024
4e3e3c8
Fix type hint.
asnare Oct 16, 2024
616b0f1
Mark test class as immutable.
asnare Oct 16, 2024
a52720d
Unit tests for the failures[] mechanism.
asnare Oct 16, 2024
6f38192
When encoding a string field, also handle it being optional.
asnare Oct 16, 2024
af1fff2
Fix comparison.
asnare Oct 16, 2024
724abb2
Unit tests for the history log.
asnare Oct 16, 2024
dd73486
Remove dead code.
asnare Oct 16, 2024
31740dd
Merge branch 'main' into crawler-snapshot-history
asnare Oct 16, 2024
00d5d85
Type hint.
asnare Oct 16, 2024
57f63ba
Test detection of naive timestamps during encoding.
asnare Oct 16, 2024
3aa2431
Rename test argument to avoid shadowing a global.
asnare Oct 16, 2024
4c86ba6
Unit test for handling unserializable values.
asnare Oct 16, 2024
5c27d8b
Merge branch 'main' into crawler-snapshot-history
asnare Oct 17, 2024
39f1105
Inline trivial method.
asnare Oct 17, 2024
1d50ce1
Update error message on unserializable value to provide more context.
asnare Oct 17, 2024
d431322
Merge branch 'main' into crawler-snapshot-history
asnare Oct 17, 2024
95d2a48
Merge branch 'main' into crawler-snapshot-history
asnare Oct 17, 2024
4ce4ce4
Rename for consistency.
asnare Oct 17, 2024
7441077
Update HistoryLog initializer: it doesn't need a workspace client.
asnare Oct 18, 2024
07bc711
Update to object_id support to allow properties as well as fields.
asnare Oct 18, 2024
3b3e5bd
Tweak type signature: the snapshot to append can be any iterable type.
asnare Oct 18, 2024
a9dd77f
Unit tests for logging Table records into the history log.
asnare Oct 18, 2024
0f9a4cd
Update Grants to support logging into the history log.
asnare Oct 18, 2024
ad482b8
Update the migration progress workflow to log tables and grants to th…
asnare Oct 18, 2024
fd2b3ac
Unit tests for a migration-progress task that wasn't covered yet.
asnare Oct 18, 2024
9d4ed60
Merge branch 'main' into crawler-snapshot-history
asnare Oct 18, 2024
3703848
Support classes whose failures attribute is a string (containing JSON…
asnare Oct 18, 2024
62390b5
Ensure updated UDF snapshots are logged to the history table.
asnare Oct 18, 2024
c1fea83
Naming consistency.
asnare Oct 18, 2024
8c70541
Fix copypasta.
asnare Oct 18, 2024
6f41fc0
Ensure updated JobInfo snapshots are appended to the history log.
asnare Oct 18, 2024
927392a
Fix type hints.
asnare Oct 18, 2024
78f592d
Ensure updated ClusterInfo snapshots are stored in the history table.
asnare Oct 18, 2024
6dfed4c
Fix some test names.
asnare Oct 18, 2024
34b1e72
Ensure updated PipelineInfo snapshots are appended to the history table.
asnare Oct 18, 2024
77a06e1
Ensure updated Cluster Policy snapshots are logged to the history table.
asnare Oct 18, 2024
ac60675
Formatting.
asnare Oct 18, 2024
f00a06e
Fix docstring.
asnare Oct 18, 2024
4d6989c
Ensure that updated TableMigrationStatus snapshots are appended to th…
asnare Oct 18, 2024
17100d5
Update query to return at most a single record.
asnare Oct 21, 2024
78a9a13
Update integration test to verify that the history log is written to.
asnare Oct 21, 2024
21ad44d
Update history log to write to multiworkspace.historical instead of u…
asnare Oct 21, 2024
febc9ce
Ensure all tasks run on a UC-enabled cluster.
asnare Oct 21, 2024
383027d
Merge branch 'main' into crawler-snapshot-history
asnare Oct 21, 2024
e223612
Formatting.
asnare Oct 21, 2024
c938dd8
Split the crawling and history-log update across 2 tasks for the tabl…
asnare Oct 21, 2024
8a378dc
Note a limitation of the fast-scan table crawler.
asnare Oct 21, 2024
2e5869d
Factor out the ownership components so they can be used elsewhere.
asnare Oct 21, 2024
bf542a4
Mark the __id_attributes__ sequence as immutable.
asnare Oct 21, 2024
3aad8bf
Mark linters as still needing to be done.
asnare Oct 21, 2024
a990327
Merge branch 'main' into crawler-snapshot-history
asnare Oct 21, 2024
2381d18
Handle UDF failures, which aren't JSON-encoded as with other classes.
asnare Oct 21, 2024
23e7720
Sort imports.
asnare Oct 22, 2024
567d809
Test case (and fix) for when __id_attributes__ is annotated as None.
asnare Oct 22, 2024
f0bd963
Docstring explaining HistoricalEncoder design and intent.
asnare Oct 22, 2024
5b2ab77
Rename some things to align more closely with what they are.
asnare Oct 22, 2024
0ff2e95
Remove redundant type alternative.
asnare Oct 22, 2024
971cb4c
Docstring wording and formatting updates.
asnare Oct 22, 2024
be16738
Mention use-case for these records.
asnare Oct 22, 2024
2b9b476
Clarify reason for assumption.
asnare Oct 22, 2024
f17dc74
Document reason for non-default JSON separators.
asnare Oct 22, 2024
823c886
Detect and handle non-string values being passed in string-hinted fie…
asnare Oct 22, 2024
7377727
Merge branch 'main' into crawler-snapshot-history
asnare Oct 22, 2024
fdb03b2
Handle the remaining lsql-supported fields types.
asnare Oct 22, 2024
b7af858
All tasks in the workflow are supposed to depend on the assessment ha…
asnare Oct 22, 2024
7db78d5
Support encoding DirectFsAccess records as history records.
asnare Oct 22, 2024
2a34af1
Update some HistoryEncoding unit tests to include (indirect) string t…
asnare Oct 22, 2024
207760a
Ensure the migration-progress workflow also captures changes to the D…
asnare Oct 22, 2024
9b489b9
Explicitly declare the dependency of the record_workflow_run on the t…
asnare Oct 23, 2024
49a37bb
Merge branch 'crawler-snapshot-history' into snapshot-history-dfsa
asnare Oct 23, 2024
92bbbf2
Update the workflow recorder task to also wait for the linting refres…
asnare Oct 23, 2024
0a2f4f2
Use correct method for converting rows to dictionaries.
asnare Oct 23, 2024
0a2e1ba
Merge branch 'crawler-snapshot-history' into snapshot-history-dfsa
asnare Oct 23, 2024
2f3c54a
Merge branch 'main' into snapshot-history-dfsa
asnare Oct 23, 2024
f75755c
Merge branch 'main' into snapshot-history-dfsa
asnare Oct 23, 2024
fa24879
Merge branch 'main' into snapshot-history-dfsa
asnare Oct 23, 2024
b69340a
Work around the Spark-based LSQL backend returning naive datetime ins…
asnare Oct 23, 2024
82105ca
Fix import mistake and linting issue.
asnare Oct 23, 2024
89a9f37
Merge branch 'main' into snapshot-history-dfsa
asnare Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from functools import cached_property
from pathlib import Path

from databricks.labs.ucx.source_code.base import DirectFsAccess

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installer import InstallState
from databricks.labs.blueprint.tui import Prompts
Expand All @@ -17,7 +19,7 @@
from databricks.labs.ucx.recon.metadata_retriever import DatabricksTableMetadataRetriever
from databricks.labs.ucx.recon.migration_recon import MigrationRecon
from databricks.labs.ucx.recon.schema_comparator import StandardSchemaComparator
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler, DirectFsAccessOwnership
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler
from databricks.sdk import AccountClient, WorkspaceClient, core
Expand All @@ -28,7 +30,7 @@
from databricks.labs.ucx.assessment.export import AssessmentExporter
from databricks.labs.ucx.aws.credentials import CredentialManager
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership
from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership, WorkspacePathOwnership
from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
from databricks.labs.ucx.hive_metastore.grants import (
Expand Down Expand Up @@ -503,6 +505,10 @@ def directfs_access_crawler_for_paths(self) -> DirectFsAccessCrawler:
def directfs_access_crawler_for_queries(self) -> DirectFsAccessCrawler:
return DirectFsAccessCrawler.for_queries(self.sql_backend, self.inventory_database)

@cached_property
def directfs_access_ownership(self) -> Ownership[DirectFsAccess]:
return DirectFsAccessOwnership(self.administrator_locator)

@cached_property
def used_tables_crawler_for_paths(self):
return UsedTablesCrawler.for_paths(self.sql_backend, self.inventory_database)
Expand Down
13 changes: 13 additions & 0 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from functools import cached_property
from pathlib import Path

from databricks.labs.ucx.source_code.base import DirectFsAccess

from databricks.labs.blueprint.installation import Installation
from databricks.labs.lsql.backends import RuntimeBackend, SqlBackend
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus
Expand Down Expand Up @@ -182,6 +184,17 @@ def historical_cluster_policies_log(self) -> HistoryLog[PolicyInfo]:
self.config.ucx_catalog,
)

@cached_property
def historical_directfs_access_log(self) -> HistoryLog[DirectFsAccess]:
return HistoryLog(
self.sql_backend,
self.directfs_access_ownership,
DirectFsAccess,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_grants_log(self) -> HistoryLog[Grant]:
return HistoryLog(
Expand Down
9 changes: 8 additions & 1 deletion src/databricks/labs/ucx/progress/history.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations
import dataclasses
import datetime as dt
import typing
from enum import Enum, EnumMeta
import json
import logging
Expand Down Expand Up @@ -100,7 +101,13 @@ def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, typ
- A dictionary of fields to include in the object data, and their type.
- The type of the failures field, if present.
"""
field_names_with_types = {field.name: field.type for field in dataclasses.fields(klass)}
# Ignore the field types returned by dataclasses.fields(): it doesn't resolve string-based annotations (which
# are produced automatically in a __future__.__annotations__ context). Unfortunately the dataclass mechanism
# captures the type hints prior to resolution (which happens later in the class initialization process).
# As such, we rely on dataclasses.fields() for the set of field names, but not the types which we fetch directly.
klass_type_hints = typing.get_type_hints(klass)
field_names = [field.name for field in dataclasses.fields(klass)]
field_names_with_types = {field_name: klass_type_hints[field_name] for field_name in field_names}
if "failures" not in field_names_with_types:
failures_type = None
else:
Expand Down
16 changes: 12 additions & 4 deletions src/databricks/labs/ucx/progress/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,25 @@ def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
migration_status_snapshot = ctx.migration_status_refresher.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(migration_status_snapshot)

@job_task(depends_on=[verify_prerequisites])
@job_task(depends_on=[verify_prerequisites], job_cluster="table_migration")
def assess_dashboards(self, ctx: RuntimeContext):
"""Scans all dashboards for migration issues in SQL code of embedded widgets.
Also stores direct filesystem accesses for display in the migration dashboard."""
# TODO: Ensure these are captured in the history log.
history_log = ctx.historical_directfs_access_log
ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)
directfs_access_snapshot = ctx.directfs_access_crawler_for_queries.snapshot()
# Note: The object-type is DirectFsAccess, the same as the workflow version.
history_log.append_inventory_snapshot(directfs_access_snapshot)

@job_task(depends_on=[verify_prerequisites])
@job_task(depends_on=[verify_prerequisites], job_cluster="table_migration")
def assess_workflows(self, ctx: RuntimeContext):
"""Scans all jobs for migration issues in notebooks.
Also stores direct filesystem accesses for display in the migration dashboard."""
# TODO: Ensure these are captured in the history log.
history_log = ctx.historical_directfs_access_log
ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)
directfs_access_snapshot = ctx.directfs_access_crawler_for_paths.snapshot()
# Note: the object-type is DirectFsAccess, the same as the query version.
history_log.append_inventory_snapshot(directfs_access_snapshot)

@job_task(
depends_on=[
Expand All @@ -167,7 +173,9 @@ def assess_workflows(self, ctx: RuntimeContext):
crawl_udfs,
assess_jobs,
assess_clusters,
assess_dashboards,
assess_pipelines,
assess_workflows,
crawl_cluster_policies,
refresh_table_migration_status,
update_tables_history_log,
Expand Down
23 changes: 17 additions & 6 deletions src/databricks/labs/ucx/source_code/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import io
import logging
import sys
import warnings
from abc import abstractmethod, ABC
from collections.abc import Iterable
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, BinaryIO, TextIO
from typing import Any, BinaryIO, TextIO, ClassVar

from astroid import NodeNG # type: ignore
from sqlglot import Expression, parse as parse_sql
Expand Down Expand Up @@ -190,15 +191,25 @@ def from_dict(cls, data: dict[str, Any]) -> Self:
if isinstance(source_lineage, list) and len(source_lineage) > 0 and isinstance(source_lineage[0], dict):
lineage_atoms = [LineageAtom(**lineage) for lineage in source_lineage]
data["source_lineage"] = lineage_atoms
# Some LSQL backends return naive datetime instances; work around downstream issues by attaching UTC.
for field_name in ("source_timestamp", "assessment_start_timestamp", "assessment_end_timestamp"):
value = data.get(field_name, None)
if value is None or value.tzinfo is not None:
continue
warnings.warn(f"Naive datetime detected; should have time-zone associated: {field_name}")
data[field_name] = value.replace(tzinfo=timezone.utc)
return cls(**data)

UNKNOWN = "unknown"
UNKNOWN: ClassVar[str] = "unknown"
NO_TS: ClassVar[datetime] = datetime.fromtimestamp(0, tz=timezone.utc)

source_id: str = UNKNOWN
source_timestamp: datetime = datetime.fromtimestamp(0)
source_timestamp: datetime = NO_TS
source_lineage: list[LineageAtom] = field(default_factory=list)
assessment_start_timestamp: datetime = datetime.fromtimestamp(0)
assessment_end_timestamp: datetime = datetime.fromtimestamp(0)
assessment_start_timestamp: datetime = NO_TS
assessment_end_timestamp: datetime = NO_TS

__id_attributes__: ClassVar[tuple[str, ...]] = ("source_id",)

def replace_source(
self,
Expand Down
3 changes: 1 addition & 2 deletions src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None:
Providing a multi-entity crawler is out-of-scope of this PR
"""
try:
# TODO until we historize data, we append all DFSAs
self._update_snapshot(dfsas, mode="append")
self._update_snapshot(dfsas, mode="overwrite")
except DatabricksError as e:
logger.error("Failed to store DFSAs", exc_info=e)

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/progress/test_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_historical_encoder_object_id(ownership) -> None:
class _CompoundKey:
a_field: str = "field-a"
b_field: str = "field-b"
c_field: str = "field-c"
c_field: "str" = "field-c" # Annotations can be strings as well.

@property
def d_property(self) -> str:
Expand Down Expand Up @@ -270,7 +270,7 @@ def test_historical_encoder_object_data_values_strings_as_is(ownership) -> None:
@dataclass
class _AClass:
a_field: str = "value"
existing_json_field: str = "[1, 2, 3]"
existing_json_field: "str" = "[1, 2, 3]"
optional_string_field: str | None = "value"

__id_attributes__: ClassVar = ("a_field",)
Expand Down Expand Up @@ -481,7 +481,7 @@ class _BrokenFailures2:
__id_attributes__: ClassVar = ("a_field",)


@pytest.mark.parametrize("klass,broken_type", ((_BrokenFailures1, list[int]), (_BrokenFailures2, None)))
@pytest.mark.parametrize("klass,broken_type", ((_BrokenFailures1, list[int]), (_BrokenFailures2, type(None))))
def test_historical_encoder_failures_verification(
ownership,
klass: type[DataclassWithIdAttributes],
Expand Down
34 changes: 29 additions & 5 deletions tests/unit/progress/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,42 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None:


@pytest.mark.parametrize(
"task, linter",
"task, linter, crawler, history_log",
(
(MigrationProgress.assess_dashboards, RuntimeContext.query_linter),
(MigrationProgress.assess_workflows, RuntimeContext.workflow_linter),
(
MigrationProgress.assess_dashboards,
RuntimeContext.query_linter,
RuntimeContext.directfs_access_crawler_for_queries,
RuntimeContext.historical_directfs_access_log,
),
(
MigrationProgress.assess_workflows,
RuntimeContext.workflow_linter,
RuntimeContext.directfs_access_crawler_for_paths,
RuntimeContext.historical_directfs_access_log,
),
),
)
def test_linter_runtime_refresh(run_workflow, task, linter) -> None:
def test_linter_runtime_refresh(run_workflow, task, linter, crawler, history_log) -> None:
linter_class = get_type_hints(linter.func)["return"]
crawler_class = get_type_hints(crawler.func)["return"]
mock_linter = create_autospec(linter_class)
mock_crawler = create_autospec(crawler_class)
mock_history_log = create_autospec(HistoryLog)
linter_name = linter.attrname
ctx = run_workflow(task, **{linter_name: mock_linter})
crawler_name = crawler.attrname
history_log_name = history_log.attrname
context_replacements = {
linter_name: mock_linter,
crawler_name: mock_crawler,
history_log_name: mock_history_log,
"named_parameters": {"parent_run_id": 53},
}
ctx = run_workflow(task, **context_replacements)

mock_linter.refresh_report.assert_called_once_with(ctx.sql_backend, ctx.inventory_database)
mock_crawler.snapshot.assert_called_once_with()
mock_history_log.append_inventory_snapshot.assert_called_once()


def test_migration_progress_with_valid_prerequisites(run_workflow) -> None:
Expand Down
104 changes: 102 additions & 2 deletions tests/unit/source_code/test_directfs_access.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import datetime as dt
from unittest.mock import create_autospec


import pytest
from databricks.labs.lsql.backends import MockBackend
from databricks.labs.lsql.core import Row

from databricks.labs.ucx.__about__ import __version__ as ucx_version
from databricks.labs.ucx.framework.owners import AdministratorLocator
from databricks.labs.ucx.progress.history import HistoryLog
from databricks.labs.ucx.source_code.base import LineageAtom
from databricks.labs.ucx.source_code.directfs_access import (
DirectFsAccessCrawler,
Expand All @@ -12,7 +17,7 @@
)


def test_crawler_appends_dfsas() -> None:
def test_crawler_replaces_dfsas() -> None:
backend = MockBackend()
crawler = DirectFsAccessCrawler.for_paths(backend, "schema")
existing = list(crawler.snapshot())
Expand All @@ -32,7 +37,7 @@ def test_crawler_appends_dfsas() -> None:
for path in ("a", "b", "c")
)
crawler.dump_all(dfsas)
rows = backend.rows_written_for(crawler.full_name, "append")
rows = backend.rows_written_for(crawler.full_name, "overwrite")
assert len(rows) == 3


Expand All @@ -47,3 +52,98 @@ def test_directfs_access_ownership() -> None:

assert owner == "an_admin"
admin_locator.get_workspace_administrator.assert_called_once()


@pytest.mark.parametrize(
"directfs_access_record,history_record",
(
(
DirectFsAccess(
source_id="an_id",
source_timestamp=dt.datetime(
year=2024, month=10, day=21, hour=14, minute=45, second=10, tzinfo=dt.timezone.utc
),
source_lineage=[
LineageAtom(object_type="WORKFLOW", object_id="a", other={"foo": "bar", "baz": "daz"}),
LineageAtom(object_type="FILE", object_id="b"),
LineageAtom(object_type="NOTEBOOK", object_id="c", other={}),
],
assessment_start_timestamp=dt.datetime(
year=2024, month=10, day=22, hour=13, minute=45, second=10, tzinfo=dt.timezone.utc
),
assessment_end_timestamp=dt.datetime(
year=2024, month=10, day=22, hour=14, minute=45, second=10, tzinfo=dt.timezone.utc
),
path="/path",
is_read=True,
is_write=False,
),
Row(
workspace_id=2,
job_run_id=1,
object_type="DirectFsAccess",
object_id=["an_id"],
data={
"source_id": "an_id",
"source_timestamp": "2024-10-21T14:45:10Z",
"source_lineage": '[{"object_type":"WORKFLOW","object_id":"a","other":{"foo":"bar","baz":"daz"}},{"object_type":"FILE","object_id":"b","other":null},{"object_type":"NOTEBOOK","object_id":"c","other":{}}]',
"assessment_start_timestamp": "2024-10-22T13:45:10Z",
"assessment_end_timestamp": "2024-10-22T14:45:10Z",
"path": "/path",
"is_read": "true",
"is_write": "false",
},
failures=[],
owner="user@domain",
ucx_version=ucx_version,
),
),
(
DirectFsAccess(
source_id="the_id",
source_timestamp=dt.datetime(
year=2024, month=10, day=21, hour=14, minute=45, second=10, tzinfo=dt.timezone.utc
),
source_lineage=[],
),
Row(
workspace_id=2,
job_run_id=1,
object_type="DirectFsAccess",
object_id=["the_id"],
data={
"source_id": "the_id",
"source_timestamp": "2024-10-21T14:45:10Z",
"source_lineage": "[]",
"assessment_start_timestamp": "1970-01-01T00:00:00Z",
"assessment_end_timestamp": "1970-01-01T00:00:00Z",
"path": "unknown",
"is_read": "false",
"is_write": "false",
},
failures=[],
owner="user@domain",
ucx_version=ucx_version,
),
),
),
)
def test_directfs_supports_history(mock_backend, directfs_access_record: DirectFsAccess, history_record: Row) -> None:
"""Verify that DirectFsAccess records are written as expected to the history log."""
admin_locator = create_autospec(AdministratorLocator)
admin_locator.get_workspace_administrator.return_value = "user@domain"
directfs_access_ownership = DirectFsAccessOwnership(admin_locator)
history_log = HistoryLog[DirectFsAccess](
mock_backend,
directfs_access_ownership,
DirectFsAccess,
run_id=1,
workspace_id=2,
catalog="a_catalog",
)

history_log.append_inventory_snapshot([directfs_access_record])

rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append")

assert rows == [history_record]
Loading