diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 24132afd1f..f847864945 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -6,6 +6,8 @@ from functools import cached_property from pathlib import Path +from databricks.labs.ucx.source_code.base import DirectFsAccess + from databricks.labs.blueprint.installation import Installation from databricks.labs.blueprint.installer import InstallState from databricks.labs.blueprint.tui import Prompts @@ -17,7 +19,7 @@ from databricks.labs.ucx.recon.metadata_retriever import DatabricksTableMetadataRetriever from databricks.labs.ucx.recon.migration_recon import MigrationRecon from databricks.labs.ucx.recon.schema_comparator import StandardSchemaComparator -from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler, DirectFsAccessOwnership from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler from databricks.sdk import AccountClient, WorkspaceClient, core @@ -28,7 +30,7 @@ from databricks.labs.ucx.assessment.export import AssessmentExporter from databricks.labs.ucx.aws.credentials import CredentialManager from databricks.labs.ucx.config import WorkspaceConfig -from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership +from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership, WorkspacePathOwnership from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema from databricks.labs.ucx.hive_metastore.grants import ( @@ -503,6 +505,10 @@ def directfs_access_crawler_for_paths(self) -> DirectFsAccessCrawler: def directfs_access_crawler_for_queries(self) -> DirectFsAccessCrawler: return DirectFsAccessCrawler.for_queries(self.sql_backend, self.inventory_database) + @cached_property + def directfs_access_ownership(self) -> Ownership[DirectFsAccess]: + return DirectFsAccessOwnership(self.administrator_locator) + @cached_property def used_tables_crawler_for_paths(self): return UsedTablesCrawler.for_paths(self.sql_backend, self.inventory_database) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 81a6ec7497..3c24f260dc 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -1,6 +1,8 @@ from functools import cached_property from pathlib import Path +from databricks.labs.ucx.source_code.base import DirectFsAccess + from databricks.labs.blueprint.installation import Installation from databricks.labs.lsql.backends import RuntimeBackend, SqlBackend from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus @@ -182,6 +184,17 @@ def historical_cluster_policies_log(self) -> HistoryLog[PolicyInfo]: self.config.ucx_catalog, ) + @cached_property + def historical_directfs_access_log(self) -> HistoryLog[DirectFsAccess]: + return HistoryLog( + self.sql_backend, + self.directfs_access_ownership, + DirectFsAccess, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) + @cached_property def historical_grants_log(self) -> HistoryLog[Grant]: return HistoryLog( diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index f7c4909556..1dd5efa68a 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -1,6 +1,7 @@ from __future__ import annotations import dataclasses import datetime as dt +import typing from enum import Enum, EnumMeta import json import logging @@ -100,7 +101,13 @@ def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, typ - A dictionary of fields to include in the object data, and their type. - The type of the failures field, if present. """ - field_names_with_types = {field.name: field.type for field in dataclasses.fields(klass)} + # Ignore the field types returned by dataclasses.fields(): it doesn't resolve string-based annotations (which + # are produced automatically in a __future__.__annotations__ context). Unfortunately the dataclass mechanism + # captures the type hints prior to resolution (which happens later in the class initialization process). + # As such, we rely on dataclasses.fields() for the set of field names, but not the types which we fetch directly. + klass_type_hints = typing.get_type_hints(klass) + field_names = [field.name for field in dataclasses.fields(klass)] + field_names_with_types = {field_name: klass_type_hints[field_name] for field_name in field_names} if "failures" not in field_names_with_types: failures_type = None else: diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index e256af2fa3..0405bb021a 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -146,19 +146,25 @@ def refresh_table_migration_status(self, ctx: RuntimeContext) -> None: migration_status_snapshot = ctx.migration_status_refresher.snapshot(force_refresh=True) history_log.append_inventory_snapshot(migration_status_snapshot) - @job_task(depends_on=[verify_prerequisites]) + @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") def assess_dashboards(self, ctx: RuntimeContext): """Scans all dashboards for migration issues in SQL code of embedded widgets. Also stores direct filesystem accesses for display in the migration dashboard.""" - # TODO: Ensure these are captured in the history log. + history_log = ctx.historical_directfs_access_log ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + directfs_access_snapshot = ctx.directfs_access_crawler_for_queries.snapshot() + # Note: The object-type is DirectFsAccess, the same as the workflow version. + history_log.append_inventory_snapshot(directfs_access_snapshot) - @job_task(depends_on=[verify_prerequisites]) + @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") def assess_workflows(self, ctx: RuntimeContext): """Scans all jobs for migration issues in notebooks. Also stores direct filesystem accesses for display in the migration dashboard.""" - # TODO: Ensure these are captured in the history log. + history_log = ctx.historical_directfs_access_log ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + directfs_access_snapshot = ctx.directfs_access_crawler_for_paths.snapshot() + # Note: the object-type is DirectFsAccess, the same as the query version. + history_log.append_inventory_snapshot(directfs_access_snapshot) @job_task( depends_on=[ @@ -167,7 +173,9 @@ def assess_workflows(self, ctx: RuntimeContext): crawl_udfs, assess_jobs, assess_clusters, + assess_dashboards, assess_pipelines, + assess_workflows, crawl_cluster_policies, refresh_table_migration_status, update_tables_history_log, diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 85f5b598f6..9ec1de886b 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -5,12 +5,13 @@ import io import logging import sys +import warnings from abc import abstractmethod, ABC from collections.abc import Iterable from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path -from typing import Any, BinaryIO, TextIO +from typing import Any, BinaryIO, TextIO, ClassVar from astroid import NodeNG # type: ignore from sqlglot import Expression, parse as parse_sql @@ -190,15 +191,25 @@ def from_dict(cls, data: dict[str, Any]) -> Self: if isinstance(source_lineage, list) and len(source_lineage) > 0 and isinstance(source_lineage[0], dict): lineage_atoms = [LineageAtom(**lineage) for lineage in source_lineage] data["source_lineage"] = lineage_atoms + # Some LSQL backends return naive datetime instances; work around downstream issues by attaching UTC. + for field_name in ("source_timestamp", "assessment_start_timestamp", "assessment_end_timestamp"): + value = data.get(field_name, None) + if value is None or value.tzinfo is not None: + continue + warnings.warn(f"Naive datetime detected; should have time-zone associated: {field_name}") + data[field_name] = value.replace(tzinfo=timezone.utc) return cls(**data) - UNKNOWN = "unknown" + UNKNOWN: ClassVar[str] = "unknown" + NO_TS: ClassVar[datetime] = datetime.fromtimestamp(0, tz=timezone.utc) source_id: str = UNKNOWN - source_timestamp: datetime = datetime.fromtimestamp(0) + source_timestamp: datetime = NO_TS source_lineage: list[LineageAtom] = field(default_factory=list) - assessment_start_timestamp: datetime = datetime.fromtimestamp(0) - assessment_end_timestamp: datetime = datetime.fromtimestamp(0) + assessment_start_timestamp: datetime = NO_TS + assessment_end_timestamp: datetime = NO_TS + + __id_attributes__: ClassVar[tuple[str, ...]] = ("source_id",) def replace_source( self, diff --git a/src/databricks/labs/ucx/source_code/directfs_access.py b/src/databricks/labs/ucx/source_code/directfs_access.py index c7e16cad2f..ad6bd81ecf 100644 --- a/src/databricks/labs/ucx/source_code/directfs_access.py +++ b/src/databricks/labs/ucx/source_code/directfs_access.py @@ -40,8 +40,7 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None: Providing a multi-entity crawler is out-of-scope of this PR """ try: - # TODO until we historize data, we append all DFSAs - self._update_snapshot(dfsas, mode="append") + self._update_snapshot(dfsas, mode="overwrite") except DatabricksError as e: logger.error("Failed to store DFSAs", exc_info=e) diff --git a/tests/unit/progress/test_history.py b/tests/unit/progress/test_history.py index afa2e406d6..01d6284023 100644 --- a/tests/unit/progress/test_history.py +++ b/tests/unit/progress/test_history.py @@ -118,7 +118,7 @@ def test_historical_encoder_object_id(ownership) -> None: class _CompoundKey: a_field: str = "field-a" b_field: str = "field-b" - c_field: str = "field-c" + c_field: "str" = "field-c" # Annotations can be strings as well. @property def d_property(self) -> str: @@ -270,7 +270,7 @@ def test_historical_encoder_object_data_values_strings_as_is(ownership) -> None: @dataclass class _AClass: a_field: str = "value" - existing_json_field: str = "[1, 2, 3]" + existing_json_field: "str" = "[1, 2, 3]" optional_string_field: str | None = "value" __id_attributes__: ClassVar = ("a_field",) @@ -481,7 +481,7 @@ class _BrokenFailures2: __id_attributes__: ClassVar = ("a_field",) -@pytest.mark.parametrize("klass,broken_type", ((_BrokenFailures1, list[int]), (_BrokenFailures2, None))) +@pytest.mark.parametrize("klass,broken_type", ((_BrokenFailures1, list[int]), (_BrokenFailures2, type(None)))) def test_historical_encoder_failures_verification( ownership, klass: type[DataclassWithIdAttributes], diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 555eb49a04..87d2ccd4f2 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -72,18 +72,42 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: @pytest.mark.parametrize( - "task, linter", + "task, linter, crawler, history_log", ( - (MigrationProgress.assess_dashboards, RuntimeContext.query_linter), - (MigrationProgress.assess_workflows, RuntimeContext.workflow_linter), + ( + MigrationProgress.assess_dashboards, + RuntimeContext.query_linter, + RuntimeContext.directfs_access_crawler_for_queries, + RuntimeContext.historical_directfs_access_log, + ), + ( + MigrationProgress.assess_workflows, + RuntimeContext.workflow_linter, + RuntimeContext.directfs_access_crawler_for_paths, + RuntimeContext.historical_directfs_access_log, + ), ), ) -def test_linter_runtime_refresh(run_workflow, task, linter) -> None: +def test_linter_runtime_refresh(run_workflow, task, linter, crawler, history_log) -> None: linter_class = get_type_hints(linter.func)["return"] + crawler_class = get_type_hints(crawler.func)["return"] mock_linter = create_autospec(linter_class) + mock_crawler = create_autospec(crawler_class) + mock_history_log = create_autospec(HistoryLog) linter_name = linter.attrname - ctx = run_workflow(task, **{linter_name: mock_linter}) + crawler_name = crawler.attrname + history_log_name = history_log.attrname + context_replacements = { + linter_name: mock_linter, + crawler_name: mock_crawler, + history_log_name: mock_history_log, + "named_parameters": {"parent_run_id": 53}, + } + ctx = run_workflow(task, **context_replacements) + mock_linter.refresh_report.assert_called_once_with(ctx.sql_backend, ctx.inventory_database) + mock_crawler.snapshot.assert_called_once_with() + mock_history_log.append_inventory_snapshot.assert_called_once() def test_migration_progress_with_valid_prerequisites(run_workflow) -> None: diff --git a/tests/unit/source_code/test_directfs_access.py b/tests/unit/source_code/test_directfs_access.py index c00c1cdcc6..4e495ad40e 100644 --- a/tests/unit/source_code/test_directfs_access.py +++ b/tests/unit/source_code/test_directfs_access.py @@ -1,9 +1,14 @@ import datetime as dt from unittest.mock import create_autospec + +import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.lsql.core import Row +from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.progress.history import HistoryLog from databricks.labs.ucx.source_code.base import LineageAtom from databricks.labs.ucx.source_code.directfs_access import ( DirectFsAccessCrawler, @@ -12,7 +17,7 @@ ) -def test_crawler_appends_dfsas() -> None: +def test_crawler_replaces_dfsas() -> None: backend = MockBackend() crawler = DirectFsAccessCrawler.for_paths(backend, "schema") existing = list(crawler.snapshot()) @@ -32,7 +37,7 @@ def test_crawler_appends_dfsas() -> None: for path in ("a", "b", "c") ) crawler.dump_all(dfsas) - rows = backend.rows_written_for(crawler.full_name, "append") + rows = backend.rows_written_for(crawler.full_name, "overwrite") assert len(rows) == 3 @@ -47,3 +52,98 @@ def test_directfs_access_ownership() -> None: assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + + +@pytest.mark.parametrize( + "directfs_access_record,history_record", + ( + ( + DirectFsAccess( + source_id="an_id", + source_timestamp=dt.datetime( + year=2024, month=10, day=21, hour=14, minute=45, second=10, tzinfo=dt.timezone.utc + ), + source_lineage=[ + LineageAtom(object_type="WORKFLOW", object_id="a", other={"foo": "bar", "baz": "daz"}), + LineageAtom(object_type="FILE", object_id="b"), + LineageAtom(object_type="NOTEBOOK", object_id="c", other={}), + ], + assessment_start_timestamp=dt.datetime( + year=2024, month=10, day=22, hour=13, minute=45, second=10, tzinfo=dt.timezone.utc + ), + assessment_end_timestamp=dt.datetime( + year=2024, month=10, day=22, hour=14, minute=45, second=10, tzinfo=dt.timezone.utc + ), + path="/path", + is_read=True, + is_write=False, + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="DirectFsAccess", + object_id=["an_id"], + data={ + "source_id": "an_id", + "source_timestamp": "2024-10-21T14:45:10Z", + "source_lineage": '[{"object_type":"WORKFLOW","object_id":"a","other":{"foo":"bar","baz":"daz"}},{"object_type":"FILE","object_id":"b","other":null},{"object_type":"NOTEBOOK","object_id":"c","other":{}}]', + "assessment_start_timestamp": "2024-10-22T13:45:10Z", + "assessment_end_timestamp": "2024-10-22T14:45:10Z", + "path": "/path", + "is_read": "true", + "is_write": "false", + }, + failures=[], + owner="user@domain", + ucx_version=ucx_version, + ), + ), + ( + DirectFsAccess( + source_id="the_id", + source_timestamp=dt.datetime( + year=2024, month=10, day=21, hour=14, minute=45, second=10, tzinfo=dt.timezone.utc + ), + source_lineage=[], + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="DirectFsAccess", + object_id=["the_id"], + data={ + "source_id": "the_id", + "source_timestamp": "2024-10-21T14:45:10Z", + "source_lineage": "[]", + "assessment_start_timestamp": "1970-01-01T00:00:00Z", + "assessment_end_timestamp": "1970-01-01T00:00:00Z", + "path": "unknown", + "is_read": "false", + "is_write": "false", + }, + failures=[], + owner="user@domain", + ucx_version=ucx_version, + ), + ), + ), +) +def test_directfs_supports_history(mock_backend, directfs_access_record: DirectFsAccess, history_record: Row) -> None: + """Verify that DirectFsAccess records are written as expected to the history log.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "user@domain" + directfs_access_ownership = DirectFsAccessOwnership(admin_locator) + history_log = HistoryLog[DirectFsAccess]( + mock_backend, + directfs_access_ownership, + DirectFsAccess, + run_id=1, + workspace_id=2, + catalog="a_catalog", + ) + + history_log.append_inventory_snapshot([directfs_access_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record]