diff --git a/src/databricks/labs/ucx/assessment/clusters.py b/src/databricks/labs/ucx/assessment/clusters.py index 0e0624d3c2..e31284a1bb 100644 --- a/src/databricks/labs/ucx/assessment/clusters.py +++ b/src/databricks/labs/ucx/assessment/clusters.py @@ -3,6 +3,7 @@ import logging from collections.abc import Iterable from dataclasses import dataclass +from typing import ClassVar from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient @@ -46,6 +47,8 @@ class ClusterInfo: creator: str | None = None """User-name of the creator of the cluster, if known.""" + __id_attributes__: ClassVar[tuple[str, ...]] = ("cluster_id",) + class CheckClusterMixin(CheckInitScriptMixin): _ws: WorkspaceClient @@ -203,6 +206,8 @@ class PolicyInfo: creator: str | None = None """User-name of the creator of the cluster policy, if known.""" + __id_attributes__: ClassVar[tuple[str, ...]] = ("policy_id",) + class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin): def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): diff --git a/src/databricks/labs/ucx/assessment/jobs.py b/src/databricks/labs/ucx/assessment/jobs.py index 667647d967..924b33bd9f 100644 --- a/src/databricks/labs/ucx/assessment/jobs.py +++ b/src/databricks/labs/ucx/assessment/jobs.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone from hashlib import sha256 +from typing import ClassVar from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient @@ -40,6 +41,8 @@ class JobInfo: creator: str | None = None """User-name of the creator of the pipeline, if known.""" + __id_attributes__: ClassVar[tuple[str, ...]] = ("job_id",) + class JobsMixin: @classmethod diff --git a/src/databricks/labs/ucx/assessment/pipelines.py b/src/databricks/labs/ucx/assessment/pipelines.py index 75cf643ddd..40163b7dff 100644 --- a/src/databricks/labs/ucx/assessment/pipelines.py +++ b/src/databricks/labs/ucx/assessment/pipelines.py @@ -2,6 +2,7 @@ import logging from collections.abc import Iterable from dataclasses import dataclass +from typing import ClassVar from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient @@ -24,6 +25,8 @@ class PipelineInfo: creator_name: str | None = None """User-name of the creator of the pipeline, if known.""" + __id_attributes__: ClassVar[tuple[str, ...]] = ("pipeline_id",) + class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin): def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 88640988de..b21bdff4fe 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -38,17 +38,19 @@ ComputeLocations, Grant, GrantsCrawler, + GrantOwnership, MigrateGrants, PrincipalACL, ) from databricks.labs.ucx.hive_metastore.mapping import TableMapping -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationOwnership from databricks.labs.ucx.hive_metastore.table_migrate import ( TableMigrationStatusRefresher, TablesMigrator, ) from databricks.labs.ucx.hive_metastore.table_move import TableMove -from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler +from databricks.labs.ucx.hive_metastore.tables import TableOwnership +from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore from databricks.labs.ucx.installer.workflows import DeployedWorkflows from databricks.labs.ucx.progress.install import VerifyProgressTracking @@ -243,14 +245,26 @@ def group_manager(self) -> GroupManager: def grants_crawler(self) -> GrantsCrawler: return GrantsCrawler(self.tables_crawler, self.udfs_crawler, self.config.include_databases) + @cached_property + def grant_ownership(self) -> GrantOwnership: + return GrantOwnership(self.administrator_locator) + @cached_property def udfs_crawler(self) -> UdfsCrawler: return UdfsCrawler(self.sql_backend, self.inventory_database, self.config.include_databases) + @cached_property + def udf_ownership(self) -> UdfOwnership: + return UdfOwnership(self.administrator_locator) + @cached_property def tables_crawler(self) -> TablesCrawler: return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases) + @cached_property + def table_ownership(self) -> TableOwnership: + return TableOwnership(self.administrator_locator) + @cached_property def tables_migrator(self) -> TablesMigrator: return TablesMigrator( @@ -363,6 +377,10 @@ def migration_status_refresher(self) -> TableMigrationStatusRefresher: self.tables_crawler, ) + @cached_property + def table_migration_ownership(self) -> TableMigrationOwnership: + return TableMigrationOwnership(self.tables_crawler, self.table_ownership) + @cached_property def iam_credential_manager(self) -> CredentialManager: return CredentialManager(self.workspace_client) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 43705a3272..81a6ec7497 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -3,21 +3,35 @@ from databricks.labs.blueprint.installation import Installation from databricks.labs.lsql.backends import RuntimeBackend, SqlBackend +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus from databricks.sdk import WorkspaceClient, core from databricks.labs.ucx.__about__ import __version__ -from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler +from databricks.labs.ucx.assessment.clusters import ( + ClustersCrawler, + PoliciesCrawler, + ClusterOwnership, + ClusterInfo, + ClusterPolicyOwnership, + PolicyInfo, +) from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptCrawler -from databricks.labs.ucx.assessment.jobs import JobsCrawler, SubmitRunsCrawler -from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler +from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo, JobsCrawler, SubmitRunsCrawler +from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler, PipelineInfo, PipelineOwnership from databricks.labs.ucx.config import WorkspaceConfig from databricks.labs.ucx.contexts.application import GlobalContext from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler +from databricks.labs.ucx.hive_metastore.grants import Grant from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler -from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler +from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table +from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder +from databricks.labs.ucx.progress.history import HistoryLog from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder +# As with GlobalContext, service factories unavoidably have a lot of public methods. +# pylint: disable=too-many-public-methods + class RuntimeContext(GlobalContext): @cached_property @@ -54,6 +68,10 @@ def installation(self) -> Installation: def jobs_crawler(self) -> JobsCrawler: return JobsCrawler(self.workspace_client, self.sql_backend, self.inventory_database) + @cached_property + def job_ownership(self) -> JobOwnership: + return JobOwnership(self.administrator_locator) + @cached_property def submit_runs_crawler(self) -> SubmitRunsCrawler: return SubmitRunsCrawler( @@ -67,10 +85,18 @@ def submit_runs_crawler(self) -> SubmitRunsCrawler: def clusters_crawler(self) -> ClustersCrawler: return ClustersCrawler(self.workspace_client, self.sql_backend, self.inventory_database) + @cached_property + def cluster_ownership(self) -> ClusterOwnership: + return ClusterOwnership(self.administrator_locator) + @cached_property def pipelines_crawler(self) -> PipelinesCrawler: return PipelinesCrawler(self.workspace_client, self.sql_backend, self.inventory_database) + @cached_property + def pipeline_ownership(self) -> PipelineOwnership: + return PipelineOwnership(self.administrator_locator) + @cached_property def table_size_crawler(self) -> TableSizeCrawler: return TableSizeCrawler(self.tables_crawler) @@ -79,12 +105,18 @@ def table_size_crawler(self) -> TableSizeCrawler: def policies_crawler(self) -> PoliciesCrawler: return PoliciesCrawler(self.workspace_client, self.sql_backend, self.inventory_database) + @cached_property + def cluster_policy_ownership(self) -> ClusterPolicyOwnership: + return ClusterPolicyOwnership(self.administrator_locator) + @cached_property def global_init_scripts_crawler(self) -> GlobalInitScriptCrawler: return GlobalInitScriptCrawler(self.workspace_client, self.sql_backend, self.inventory_database) @cached_property def tables_crawler(self) -> TablesCrawler: + # Warning: Not all runtime contexts support the fast-scan implementation; it requires the JVM bridge to Spark + # and that's not always available. return FasterTableScanCrawler(self.sql_backend, self.inventory_database, self.config.include_databases) @cached_property @@ -116,10 +148,102 @@ def workflow_run_recorder(self) -> WorkflowRunRecorder: return WorkflowRunRecorder( self.sql_backend, self.config.ucx_catalog, - workspace_id=self.workspace_client.get_workspace_id(), + workspace_id=self.workspace_id, workflow_name=self.named_parameters["workflow"], workflow_id=int(self.named_parameters["job_id"]), workflow_run_id=int(self.named_parameters["parent_run_id"]), workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), workflow_start_time=self.named_parameters["start_time"], ) + + @cached_property + def workspace_id(self) -> int: + return self.workspace_client.get_workspace_id() + + @cached_property + def historical_clusters_log(self) -> HistoryLog[ClusterInfo]: + return HistoryLog( + self.sql_backend, + self.cluster_ownership, + ClusterInfo, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) + + @cached_property + def historical_cluster_policies_log(self) -> HistoryLog[PolicyInfo]: + return HistoryLog( + self.sql_backend, + self.cluster_policy_ownership, + PolicyInfo, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) + + @cached_property + def historical_grants_log(self) -> HistoryLog[Grant]: + return HistoryLog( + self.sql_backend, + self.grant_ownership, + Grant, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) + + @cached_property + def historical_jobs_log(self) -> HistoryLog[JobInfo]: + return HistoryLog( + self.sql_backend, + self.job_ownership, + JobInfo, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) + + @cached_property + def historical_pipelines_log(self) -> HistoryLog[PipelineInfo]: + return HistoryLog( + self.sql_backend, + self.pipeline_ownership, + PipelineInfo, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) + + @cached_property + def historical_tables_log(self) -> HistoryLog[Table]: + return HistoryLog( + self.sql_backend, + self.table_ownership, + Table, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) + + @cached_property + def historical_table_migration_log(self) -> HistoryLog[TableMigrationStatus]: + return HistoryLog( + self.sql_backend, + self.table_migration_ownership, + TableMigrationStatus, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) + + @cached_property + def historical_udfs_log(self) -> HistoryLog[Udf]: + return HistoryLog( + self.sql_backend, + self.udf_ownership, + Udf, + int(self.named_parameters["parent_run_id"]), + self.workspace_id, + self.config.ucx_catalog, + ) diff --git a/src/databricks/labs/ucx/framework/crawlers.py b/src/databricks/labs/ucx/framework/crawlers.py index 4c89cde902..034e738df1 100644 --- a/src/databricks/labs/ucx/framework/crawlers.py +++ b/src/databricks/labs/ucx/framework/crawlers.py @@ -159,6 +159,6 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool) self._update_snapshot(loaded_records, mode="overwrite") return loaded_records - def _update_snapshot(self, items: Sequence[Result], mode: Literal["append", "overwrite"] = "append") -> None: + def _update_snapshot(self, items: Sequence[Result], *, mode: Literal["append", "overwrite"]) -> None: logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}") self._backend.save_table(self.full_name, items, self._klass, mode=mode) diff --git a/src/databricks/labs/ucx/hive_metastore/grants.py b/src/databricks/labs/ucx/hive_metastore/grants.py index 03cf53dcee..de9106ce40 100644 --- a/src/databricks/labs/ucx/hive_metastore/grants.py +++ b/src/databricks/labs/ucx/hive_metastore/grants.py @@ -3,7 +3,7 @@ from collections.abc import Callable, Iterable from dataclasses import dataclass, replace from functools import partial, cached_property -from typing import Protocol +from typing import ClassVar, Protocol from databricks.labs.blueprint.installation import Installation from databricks.labs.blueprint.parallel import ManyError, Threads @@ -66,6 +66,8 @@ class Grant: any_file: bool = False anonymous_function: bool = False + __id_attributes__: ClassVar[tuple[str, ...]] = ("object_type", "object_key", "action_type", "principal") + @staticmethod def type_and_key( *, @@ -105,6 +107,11 @@ def type_and_key( ) raise ValueError(msg) + @property + def object_type(self) -> str: + this_type, _ = self.this_type_and_key() + return this_type + @property def object_key(self) -> str: _, key = self.this_type_and_key() diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 26e6a5cb93..1a15be28f0 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -2,6 +2,7 @@ import logging from dataclasses import dataclass, replace from collections.abc import Iterable, KeysView +from typing import ClassVar from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient @@ -25,6 +26,8 @@ class TableMigrationStatus: dst_table: str | None = None update_ts: str | None = None + __id_attributes__: ClassVar[tuple[str, ...]] = ("src_schema", "src_table") + def destination(self): return f"{self.dst_catalog}.{self.dst_schema}.{self.dst_table}".lower() diff --git a/src/databricks/labs/ucx/hive_metastore/tables.py b/src/databricks/labs/ucx/hive_metastore/tables.py index de8457503a..8f4adacde5 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.py +++ b/src/databricks/labs/ucx/hive_metastore/tables.py @@ -1,7 +1,7 @@ import logging import re import typing -from collections.abc import Iterable, Iterator, Collection +from collections.abc import Collection, Iterable, Iterator from dataclasses import dataclass from enum import Enum, auto from functools import cached_property, partial @@ -64,6 +64,8 @@ class Table: # pylint: disable=too-many-public-methods storage_properties: str | None = None is_partitioned: bool = False + __id_attributes__: typing.ClassVar[tuple[str, ...]] = ("catalog", "database", "name") + DBFS_ROOT_PREFIXES: typing.ClassVar[list[str]] = [ "/dbfs/", "dbfs:/", diff --git a/src/databricks/labs/ucx/hive_metastore/udfs.py b/src/databricks/labs/ucx/hive_metastore/udfs.py index 74196c543c..81ed350838 100644 --- a/src/databricks/labs/ucx/hive_metastore/udfs.py +++ b/src/databricks/labs/ucx/hive_metastore/udfs.py @@ -2,6 +2,7 @@ from collections.abc import Iterable from dataclasses import dataclass, replace from functools import partial +from typing import ClassVar from databricks.labs.blueprint.parallel import Threads from databricks.labs.lsql.backends import SqlBackend @@ -28,6 +29,12 @@ class Udf: # pylint: disable=too-many-instance-attributes comment: str success: int = 1 failures: str = "" + """A string that represents a problem associated with this UDF. + + Warning: unlike other inventory classes, this is not a JSON-encoded array but instead just a simple string. + """ + + __id_attributes__: ClassVar[tuple[str, ...]] = ("catalog", "database", "name") @property def key(self) -> str: diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py new file mode 100644 index 0000000000..f7c4909556 --- /dev/null +++ b/src/databricks/labs/ucx/progress/history.py @@ -0,0 +1,275 @@ +from __future__ import annotations +import dataclasses +import datetime as dt +from enum import Enum, EnumMeta +import json +import logging +from collections.abc import Iterable, Sequence +from typing import ClassVar, Protocol, TypeVar, Generic, Any, get_type_hints + +from databricks.labs.lsql.backends import SqlBackend + +from databricks.labs.ucx.framework.owners import Ownership +from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.progress.install import Historical + +logger = logging.getLogger(__name__) + + +class DataclassWithIdAttributes(Protocol): + __dataclass_fields__: ClassVar[dict[str, Any]] + + __id_attributes__: ClassVar[tuple[str, ...]] + """The names of attributes (can be dataclass fields or ordinary properties) that make up the object identifier. + + All attributes must be (non-optional) strings. + """ + + +Record = TypeVar("Record", bound=DataclassWithIdAttributes) +T = TypeVar("T") + + +class HistoricalEncoder(Generic[Record]): + """An encoder for dataclasses that will be stored in our history log. + + Our history records are designed with several goals in mind: + - Records can be of different types, meaning we have to store heterogenous types using a homogenous schema. + - We partially shred the records to allow for easier SQL-based querying. + - Flexibility from the start, because schema changes in the future will be very difficult. + - Records will be shared (and queried) across workspaces with different versions of UCX installed. + + With this in mind: + - We have an `object-type` (discriminator) field for holding the type of the record we're storing. This allows for + type-specific logic when querying. + - We have a (type-specific) field for storing the business key (identifier) of each record. + - We have special handling for the `failures` attribute that is often present on the types we are storing. + - The `object-data` is an unconstrained key-value map, allowing arbitrary attributes to be stored and (if top-level) + queried directly. Complex values are JSON-encoded. + - The :py:class:`Ownership` mechanism is used to associate each record with a user. + - To help with forward and backward compatibility the UCX version used to encode a record is included in each + record. + - The associated workspace and job run is also included in each record. + - These records are currently stored in a (UC-based) table that is shared across workspaces attached to the same + metastore. They are used to help report on migration progress. + """ + + _job_run_id: int + """The identifier of the current job run, with which these records are associated.""" + + _workspace_id: int + """The identifier of the current workspace, for identifying where these records came from.""" + + _ownership: Ownership[Record] + """Used to determine the owner for each record.""" + + _object_type: str + """The name of the record class being encoded by this instance.""" + + _field_names_with_types: dict[str, type] + """A map of the fields on instances of the record class (and their types); these will appear in the object data.""" + + _has_failures: type[str | list[str]] | None + """The type of the failures attribute for this record, if present.""" + + _id_attribute_names: Sequence[str] + """The names of the record attributes that are used to produce the identifier for each record. + + Attributes can be either a dataclass field or a property, for which the type must be a (non-optional) string. + """ + + def __init__(self, job_run_id: int, workspace_id: int, ownership: Ownership[Record], klass: type[Record]) -> None: + self._job_run_id = job_run_id + self._workspace_id = workspace_id + self._ownership = ownership + self._object_type = self._get_object_type(klass) + self._field_names_with_types, self._has_failures = self._get_field_names_with_types(klass) + self._id_attribute_names = self._get_id_attribute_names(klass) + + @classmethod + def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, type], type[str | list[str]] | None]: + """Return the dataclass-defined fields that the record type declares, and their associated types. + + If the record has a "failures" attribute this is treated specially: it is removed but we signal that it was + present. + + Arguments: + klass: The record type. + Returns: + A tuple containing: + - A dictionary of fields to include in the object data, and their type. + - The type of the failures field, if present. + """ + field_names_with_types = {field.name: field.type for field in dataclasses.fields(klass)} + if "failures" not in field_names_with_types: + failures_type = None + else: + failures_type = field_names_with_types.pop("failures") + if failures_type not in (str, list[str]): + msg = f"Historical record {klass} has invalid 'failures' attribute of type: {failures_type}" + raise TypeError(msg) + return field_names_with_types, failures_type + + def _get_id_attribute_names(self, klazz: type[Record]) -> Sequence[str]: + id_attribute_names = tuple(klazz.__id_attributes__) + all_fields = self._field_names_with_types + for name in id_attribute_names: + id_attribute_type = all_fields.get(name, None) or self._detect_property_type(klazz, name) + if id_attribute_type is None: + raise AttributeError(name=name, obj=klazz) + if id_attribute_type != str: + msg = f"Historical record {klazz} has a non-string id attribute: {name} (type={id_attribute_type})" + raise TypeError(msg) + return id_attribute_names + + def _detect_property_type(self, klazz: type[Record], name: str) -> str | None: + maybe_property = getattr(klazz, name, None) + if maybe_property is None: + return None + if not isinstance(maybe_property, property): + msg = f"Historical record {klazz} declares an id attribute that is not a field or property: {name} (type={maybe_property})" + raise TypeError(msg) + property_getter = maybe_property.fget + if not property_getter: + msg = f"Historical record {klazz} has a non-readable property as an id attribute: {name}" + raise TypeError(msg) + type_hints = get_type_hints(property_getter) or {} + try: + return type_hints["return"] + except KeyError as e: + msg = f"Historical record {klazz} has a property with no type as an id attribute: {name}" + raise TypeError(msg) from e + + @classmethod + def _get_object_type(cls, klass: type[Record]) -> str: + return klass.__name__ + + @classmethod + def _as_dict(cls, record: Record) -> dict[str, Any]: + return dataclasses.asdict(record) + + def _object_id(self, record: Record) -> list[str]: + return [getattr(record, field) for field in self._id_attribute_names] + + @classmethod + def _encode_non_serializable(cls, name: str, value: Any) -> Any: + if isinstance(type(value), EnumMeta): + return cls._encode_enum(value) + if isinstance(value, dt.datetime): + return cls._encode_datetime(name, value) + if isinstance(value, dt.date): + return cls._encode_date(value) + if isinstance(value, set): + return cls._encode_set(value) + + msg = f"Cannot encode {type(value)} value in or within field {name}: {value!r}" + raise TypeError(msg) + + @staticmethod + def _encode_enum(value: Enum) -> str: + """Enums are encoded as a string containing their name.""" + return value.name + + @staticmethod + def _encode_datetime(name, value: dt.datetime) -> str: + """Timestamps are encoded in ISO format, with a 'Z' timezone specifier. Naive timestamps aren't allowed.""" + # Only allow tz-aware timestamps. + if value.tzinfo is None: + # Name refers to the outermost field, not necessarily a field on a (nested) dataclass. + msg = f"Timestamp without timezone not supported in or within field {name}: {value}" + raise ValueError(msg) + # Always store with 'Z'. + ts_utc = value.astimezone(dt.timezone.utc) + return ts_utc.isoformat().replace("+00:00", "Z") + + @staticmethod + def _encode_date(value: dt.date) -> str: + """Dates are encoded in ISO format.""" + return value.isoformat() + + @staticmethod + def _encode_set(value: set[T]) -> list[T]: + """Sets are encoded as an array of the elements.""" + return list(value) + + def _encode_field_value(self, name: str, value: Any) -> str | None: + if value is None: + return None + value_type = self._field_names_with_types[name] + if value_type in (str, (str | None)): + if isinstance(value, str): + return value + msg = f"Invalid value for field {name}, not a string: {value!r}" + raise ValueError(msg) + encoded_value = json.dumps( + value, + allow_nan=False, + separators=(",", ":"), # More compact than default, which includes a space after the colon (:). + default=lambda o: self._encode_non_serializable(name, o), + ) + # Handle encoding substituted values that encode as just a string (eg. timestamps); we just return the string. + return json.loads(encoded_value) if encoded_value.startswith('"') else encoded_value + + def _object_data_and_failures(self, record: Record) -> tuple[dict[str, str], list[str]]: + record_values = self._as_dict(record) + encoded_fields = { + field: self._encode_field_value(field, record_values[field]) for field in self._field_names_with_types + } + # We must return a value: strings are mandatory (not optional) as the type. As such, optional fields need to be + # omitted from the data map if the value is None. + data = {k: v for k, v in encoded_fields.items() if v is not None} + if self._has_failures == list[str]: + failures = record_values["failures"] + elif self._has_failures == str: + raw_failures = record_values["failures"] + try: + failures = json.loads(raw_failures) if raw_failures else [] + except json.decoder.JSONDecodeError: + failures = [raw_failures] + else: + failures = [] + + return data, failures + + def to_historical(self, record: Record) -> Historical: + data, failures = self._object_data_and_failures(record) + return Historical( + workspace_id=self._workspace_id, + job_run_id=self._job_run_id, + object_type=self._object_type, + object_id=self._object_id(record), + data=data, + failures=failures, + owner=self._ownership.owner_of(record), + ) + + +class HistoryLog(Generic[Record]): + def __init__( + self, + sql_backend: SqlBackend, + ownership: Ownership[Record], + klass: type[Record], + run_id: int, + workspace_id: int, + catalog: str, + schema: str = "multiworkspace", + table: str = "historical", + ) -> None: + self._sql_backend = sql_backend + self._klass = klass + self._catalog = catalog + self._schema = schema + self._table = table + encoder = HistoricalEncoder(job_run_id=run_id, workspace_id=workspace_id, ownership=ownership, klass=klass) + self._encoder = encoder + + @property + def full_name(self) -> str: + return f"{self._catalog}.{self._schema}.{self._table}" + + def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: + history_records = [self._encoder.to_historical(record) for record in snapshot] + logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") + # This is the only writer, and the mode is 'append'. This is documented as conflict-free. + self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 299bc5deb2..e256af2fa3 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -23,26 +23,49 @@ class MigrationProgress(Workflow): def __init__(self) -> None: super().__init__('migration-progress-experimental') - @job_task + @job_task(job_cluster="table_migration") + def verify_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled. + + We will wait up to 1 hour for the assessment run to finish if it is running or pending. + """ + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task(job_cluster="tacl") + def setup_tacl(self, ctx: RuntimeContext): + """(Optimization) Allow the TACL job cluster to be started while we're verifying the prerequisites for + refreshing everything.""" + + @job_task(depends_on=[verify_prerequisites, setup_tacl], job_cluster="tacl") def crawl_tables(self, ctx: RuntimeContext) -> None: """Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such as _database name_, _table name_, _table type_, _table location_, etc., in the table named `$inventory_database.tables`. The metadata stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog.""" + # The TACL cluster is not UC-enabled, so the snapshot cannot be written immediately to the history log. + # Step 1 of 2: Just refresh the inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task + @job_task(depends_on=[verify_prerequisites, crawl_tables], job_cluster="table_migration") + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory snapshot.""" + # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. + # Step 2 of 2: Assuming (due to depends-on) the inventory was refreshed, capture into the history log. + # WARNING: this will fail if the inventory is empty, because it will then try to perform a crawl. + history_log = ctx.historical_tables_log + tables_snapshot = ctx.tables_crawler.snapshot() + history_log.append_inventory_snapshot(tables_snapshot) + + @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") def crawl_udfs(self, ctx: RuntimeContext) -> None: """Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for issues with grants that cannot be migrated to Unit Catalog.""" - ctx.udfs_crawler.snapshot(force_refresh=True) + history_log = ctx.historical_udfs_log + udfs_snapshot = ctx.udfs_crawler.snapshot(force_refresh=True) + history_log.append_inventory_snapshot(udfs_snapshot) - @job_task(job_cluster="tacl") - def setup_tacl(self, ctx: RuntimeContext) -> None: - """(Optimization) Starts `tacl` job cluster in parallel to crawling tables.""" - - @job_task(depends_on=[crawl_tables, crawl_udfs], job_cluster="tacl") + @job_task(depends_on=[verify_prerequisites, crawl_tables, crawl_udfs], job_cluster="table_migration") def crawl_grants(self, ctx: RuntimeContext) -> None: """Scans all securable objects for permissions that have been assigned: this include database-level permissions, as well permissions directly configured on objects in the (already gathered) table and UDF inventories. The @@ -51,9 +74,11 @@ def crawl_grants(self, ctx: RuntimeContext) -> None: Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table ACLs enabled and available for retrieval.""" - ctx.grants_crawler.snapshot(force_refresh=True) + history_log = ctx.historical_grants_log + grants_snapshot = ctx.grants_crawler.snapshot(force_refresh=True) + history_log.append_inventory_snapshot(grants_snapshot) - @job_task + @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") def assess_jobs(self, ctx: RuntimeContext) -> None: """Scans through all the jobs and identifies those that are not compatible with UC. The list of all the jobs is stored in the `$inventory.jobs` table. @@ -64,9 +89,11 @@ def assess_jobs(self, ctx: RuntimeContext) -> None: - Clusters with incompatible Spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.jobs_crawler.snapshot(force_refresh=True) + history_log = ctx.historical_jobs_log + jobs_snapshot = ctx.jobs_crawler.snapshot(force_refresh=True) + history_log.append_inventory_snapshot(jobs_snapshot) - @job_task + @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") def assess_clusters(self, ctx: RuntimeContext) -> None: """Scan through all the clusters and identifies those that are not compatible with UC. The list of all the clusters is stored in the`$inventory.clusters` table. @@ -77,9 +104,11 @@ def assess_clusters(self, ctx: RuntimeContext) -> None: - Clusters with incompatible spark config tags - Clusters referencing DBFS locations in one or more config options """ - ctx.clusters_crawler.snapshot(force_refresh=True) + history_log = ctx.historical_clusters_log + clusters_snapshot = ctx.clusters_crawler.snapshot(force_refresh=True) + history_log.append_inventory_snapshot(clusters_snapshot) - @job_task + @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") def assess_pipelines(self, ctx: RuntimeContext) -> None: """This module scans through all the Pipelines and identifies those pipelines which has Azure Service Principals embedded (who has been given access to the Azure storage accounts via spark configurations) in the pipeline @@ -90,9 +119,11 @@ def assess_pipelines(self, ctx: RuntimeContext) -> None: Subsequently, a list of all the pipelines with matching configurations are stored in the `$inventory.pipelines` table.""" - ctx.pipelines_crawler.snapshot(force_refresh=True) + history_log = ctx.historical_pipelines_log + pipelines_snapshot = ctx.pipelines_crawler.snapshot(force_refresh=True) + history_log.append_inventory_snapshot(pipelines_snapshot) - @job_task + @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") def crawl_cluster_policies(self, ctx: RuntimeContext) -> None: """This module scans through all the Cluster Policies and get the necessary information @@ -101,48 +132,45 @@ def crawl_cluster_policies(self, ctx: RuntimeContext) -> None: Subsequently, a list of all the policies with matching configurations are stored in the `$inventory.policies` table.""" - ctx.policies_crawler.snapshot(force_refresh=True) - - @job_task(job_cluster="table_migration") - def setup_table_migration(self, ctx: RuntimeContext) -> None: - """(Optimization) Starts `table_migration` job cluster in parallel to crawling tables.""" - - @job_task(depends_on=[setup_table_migration], job_cluster="table_migration") - def verify_prerequisites(self, ctx: RuntimeContext) -> None: - """Verify the prerequisites for running this job on the table migration cluster are fulfilled. - - We will wait up to 1 hour for the assessment run to finish if it is running or pending. - """ - ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + history_log = ctx.historical_cluster_policies_log + cluster_policies_snapshot = ctx.policies_crawler.snapshot(force_refresh=True) + history_log.append_inventory_snapshot(cluster_policies_snapshot) - @job_task(depends_on=[crawl_tables, verify_prerequisites], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites, crawl_tables, verify_prerequisites], job_cluster="table_migration") def refresh_table_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not. The results of the scan are stored in the `$inventory.migration_status` inventory table. """ - ctx.migration_status_refresher.snapshot(force_refresh=True) + history_log = ctx.historical_table_migration_log + migration_status_snapshot = ctx.migration_status_refresher.snapshot(force_refresh=True) + history_log.append_inventory_snapshot(migration_status_snapshot) - @job_task + @job_task(depends_on=[verify_prerequisites]) def assess_dashboards(self, ctx: RuntimeContext): """Scans all dashboards for migration issues in SQL code of embedded widgets. Also stores direct filesystem accesses for display in the migration dashboard.""" + # TODO: Ensure these are captured in the history log. ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) - @job_task + @job_task(depends_on=[verify_prerequisites]) def assess_workflows(self, ctx: RuntimeContext): """Scans all jobs for migration issues in notebooks. Also stores direct filesystem accesses for display in the migration dashboard.""" + # TODO: Ensure these are captured in the history log. ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) @job_task( depends_on=[ + verify_prerequisites, crawl_grants, + crawl_udfs, assess_jobs, assess_clusters, assess_pipelines, crawl_cluster_policies, refresh_table_migration_status, + update_tables_history_log, ], job_cluster="table_migration", ) diff --git a/tests/integration/progress/test_workflows.py b/tests/integration/progress/test_workflows.py index 39ed7c1407..c9e8ee0afa 100644 --- a/tests/integration/progress/test_workflows.py +++ b/tests/integration/progress/test_workflows.py @@ -32,5 +32,9 @@ def test_running_real_migration_progress_job(installation_ctx: MockInstallationC assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" # Ensure that the migration-progress workflow populated the `workflow_runs` table. - query = f"SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs" + query = f"SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs LIMIT 1" assert any(installation_ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}" + + # Ensure that the history file has records written to it. + query = f"SELECT 1 from {installation_ctx.ucx_catalog}.multiworkspace.historical LIMIT 1" + assert any(installation_ctx.sql_backend.fetch(query)), f"No snapshots captured to the history log: {query}" diff --git a/tests/unit/assessment/test_clusters.py b/tests/unit/assessment/test_clusters.py index c86c3f60f0..6a35d649dc 100644 --- a/tests/unit/assessment/test_clusters.py +++ b/tests/unit/assessment/test_clusters.py @@ -3,9 +3,11 @@ import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.lsql.core import Row from databricks.sdk.errors import DatabricksError, InternalError, NotFound from databricks.sdk.service.compute import ClusterDetails, Policy +from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler from databricks.labs.ucx.assessment.clusters import ( ClustersCrawler, @@ -17,6 +19,7 @@ ) from databricks.labs.ucx.framework.crawlers import SqlBackend from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.progress.history import HistoryLog from .. import mock_workspace_client @@ -206,6 +209,76 @@ def test_cluster_owner_creator_unknown() -> None: admin_locator.get_workspace_administrator.assert_called_once() +@pytest.mark.parametrize( + "cluster_info_record,history_record", + ( + ( + ClusterInfo( + cluster_id="1234", + success=1, + failures="[]", + spark_version="3.5.3", + policy_id="4567", + cluster_name="the_cluster", + creator="user@domain", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="ClusterInfo", + object_id=["1234"], + data={ + "cluster_id": "1234", + "success": "1", + "spark_version": "3.5.3", + "policy_id": "4567", + "cluster_name": "the_cluster", + "creator": "user@domain", + }, + failures=[], + owner="user@domain", + ucx_version=ucx_version, + ), + ), + ( + ClusterInfo(cluster_id="1234", success=0, failures='["a-failure", "another-failure"]'), + Row( + workspace_id=2, + job_run_id=1, + object_type="ClusterInfo", + object_id=["1234"], + data={ + "cluster_id": "1234", + "success": "0", + }, + failures=["a-failure", "another-failure"], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ), +) +def test_cluster_info_supports_history(mock_backend, cluster_info_record: ClusterInfo, history_record: Row) -> None: + """Verify that ClusterInfo records are written as expected to the history log.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "the_admin" + cluster_ownership = ClusterOwnership(admin_locator) + history_log = HistoryLog[ClusterInfo]( + mock_backend, + cluster_ownership, + ClusterInfo, + run_id=1, + workspace_id=2, + catalog="a_catalog", + ) + + history_log.append_inventory_snapshot([cluster_info_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record] + + def test_policy_crawler(): ws = mock_workspace_client( policy_ids=['single-user-with-spn', 'single-user-with-spn-policyid', 'single-user-with-spn-no-sparkversion'], @@ -292,3 +365,79 @@ def test_cluster_policy_owner_creator_unknown() -> None: assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + + +@pytest.mark.parametrize( + "policy_info_record,history_record", + ( + ( + PolicyInfo( + policy_id="1234", + policy_name="the_policy", + success=1, + failures="[]", + spark_version="3.5.3", + policy_description="a description of the policy", + creator="user@domain", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="PolicyInfo", + object_id=["1234"], + data={ + "policy_id": "1234", + "policy_name": "the_policy", + "success": "1", + "spark_version": "3.5.3", + "policy_description": "a description of the policy", + "creator": "user@domain", + }, + failures=[], + owner="user@domain", + ucx_version=ucx_version, + ), + ), + ( + PolicyInfo( + policy_id="1234", + policy_name="the_policy", + success=0, + failures='["a_failure", "another_failure"]', + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="PolicyInfo", + object_id=["1234"], + data={ + "policy_id": "1234", + "policy_name": "the_policy", + "success": "0", + }, + failures=["a_failure", "another_failure"], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ), +) +def test_cluster_policy_supports_history(mock_backend, policy_info_record: PolicyInfo, history_record: Row) -> None: + """Verify that PolicyInfo records are written as expected to the history log.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "the_admin" + cluster_policy_ownership = ClusterPolicyOwnership(admin_locator) + history_log = HistoryLog[PolicyInfo]( + mock_backend, + cluster_policy_ownership, + PolicyInfo, + run_id=1, + workspace_id=2, + catalog="a_catalog", + ) + + history_log.append_inventory_snapshot([policy_info_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record] diff --git a/tests/unit/assessment/test_jobs.py b/tests/unit/assessment/test_jobs.py index 8ec3e89077..9844e8c51d 100644 --- a/tests/unit/assessment/test_jobs.py +++ b/tests/unit/assessment/test_jobs.py @@ -2,8 +2,11 @@ import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.lsql.core import Row +from databricks.labs.ucx.progress.history import HistoryLog from databricks.sdk.service.jobs import BaseJob, JobSettings +from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership, JobsCrawler, SubmitRunsCrawler from databricks.labs.ucx.framework.owners import AdministratorLocator @@ -62,7 +65,7 @@ def test_jobs_assessment_with_spn_cluster_no_job_tasks(): assert result_set[0].success == 1 -def test_pipeline_crawler_creator(): +def test_job_crawler_creator(): ws = mock_workspace_client() ws.jobs.list.return_value = ( BaseJob(job_id=1, settings=JobSettings(), creator_user_name=None), @@ -135,7 +138,7 @@ def test_job_run_crawler(jobruns_ids, cluster_ids, run_ids, failures): assert result[0].failures == failures -def test_pipeline_owner_creator() -> None: +def test_job_owner_creator() -> None: admin_locator = create_autospec(AdministratorLocator) ownership = JobOwnership(admin_locator) @@ -145,7 +148,7 @@ def test_pipeline_owner_creator() -> None: admin_locator.get_workspace_administrator.assert_not_called() -def test_pipeline_owner_creator_unknown() -> None: +def test_job_owner_creator_unknown() -> None: admin_locator = create_autospec(AdministratorLocator) admin_locator.get_workspace_administrator.return_value = "an_admin" @@ -154,3 +157,73 @@ def test_pipeline_owner_creator_unknown() -> None: assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + + +@pytest.mark.parametrize( + "job_info_record,history_record", + ( + ( + JobInfo( + job_id="1234", + success=1, + failures="[]", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="JobInfo", + object_id=["1234"], + data={ + "job_id": "1234", + "success": "1", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + JobInfo( + job_id="1234", + job_name="the_job", + creator="user@domain", + success=0, + failures='["first-failure", "second-failure"]', + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="JobInfo", + object_id=["1234"], + data={ + "job_id": "1234", + "job_name": "the_job", + "creator": "user@domain", + "success": "0", + }, + failures=["first-failure", "second-failure"], + owner="user@domain", + ucx_version=ucx_version, + ), + ), + ), +) +def test_job_supports_history(mock_backend, job_info_record: JobInfo, history_record: Row) -> None: + """Verify that JobInfo records are written as expected to the history log.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "the_admin" + job_ownership = JobOwnership(admin_locator) + history_log = HistoryLog[JobInfo]( + mock_backend, + job_ownership, + JobInfo, + run_id=1, + workspace_id=2, + catalog="a_catalog", + ) + + history_log.append_inventory_snapshot([job_info_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record] diff --git a/tests/unit/assessment/test_pipelines.py b/tests/unit/assessment/test_pipelines.py index 9a637538c2..13c2424229 100644 --- a/tests/unit/assessment/test_pipelines.py +++ b/tests/unit/assessment/test_pipelines.py @@ -1,13 +1,17 @@ import logging from unittest.mock import create_autospec +import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.lsql.core import Row from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo, PipelineSpec from databricks.sdk.errors import ResourceDoesNotExist +from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler from databricks.labs.ucx.assessment.pipelines import PipelineOwnership, PipelineInfo, PipelinesCrawler from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.progress.history import HistoryLog from .. import mock_workspace_client @@ -107,3 +111,69 @@ def test_pipeline_owner_creator_unknown() -> None: assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + + +@pytest.mark.parametrize( + "pipeline_info_record,history_record", + ( + ( + PipelineInfo( + pipeline_id="1234", + success=1, + failures="[]", + pipeline_name="a_pipeline", + creator_name="user@domain", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="PipelineInfo", + object_id=["1234"], + data={ + "pipeline_id": "1234", + "success": "1", + "pipeline_name": "a_pipeline", + "creator_name": "user@domain", + }, + failures=[], + owner="user@domain", + ucx_version=ucx_version, + ), + ), + ( + PipelineInfo(pipeline_id="1234", success=0, failures='["a-failure", "b-failure"]'), + Row( + workspace_id=2, + job_run_id=1, + object_type="PipelineInfo", + object_id=["1234"], + data={ + "pipeline_id": "1234", + "success": "0", + }, + failures=["a-failure", "b-failure"], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ), +) +def test_pipeline_info_supports_history(mock_backend, pipeline_info_record: PipelineInfo, history_record: Row) -> None: + """Verify that PipelineInfo records are written as expected to the history log.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "the_admin" + pipeline_ownership = PipelineOwnership(admin_locator) + history_log = HistoryLog[PipelineInfo]( + mock_backend, + pipeline_ownership, + PipelineInfo, + run_id=1, + workspace_id=2, + catalog="a_catalog", + ) + + history_log.append_inventory_snapshot([pipeline_info_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record] diff --git a/tests/unit/hive_metastore/test_grants.py b/tests/unit/hive_metastore/test_grants.py index f4c568630c..5321606504 100644 --- a/tests/unit/hive_metastore/test_grants.py +++ b/tests/unit/hive_metastore/test_grants.py @@ -4,12 +4,15 @@ import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.lsql.core import Row +from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.hive_metastore.catalog_schema import Catalog, Schema from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler, MigrateGrants, GrantOwnership from databricks.labs.ucx.hive_metastore.tables import Table, TablesCrawler from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler +from databricks.labs.ucx.progress.history import HistoryLog from databricks.labs.ucx.workspace_access.groups import GroupManager @@ -654,3 +657,199 @@ def test_grant_owner() -> None: assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + + +@pytest.mark.parametrize( + "grant_record,history_record", + ( + ( + Grant(principal="user@domain", action_type="SELECT", catalog="main", database="foo", table="bar"), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["TABLE", "main.foo.bar", "SELECT", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "SELECT", + "catalog": "main", + "database": "foo", + "table": "bar", + "any_file": "false", + "anonymous_function": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Grant(principal="user@domain", action_type="SELECT", catalog="main", database="foo", view="bar"), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["VIEW", "main.foo.bar", "SELECT", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "SELECT", + "catalog": "main", + "database": "foo", + "view": "bar", + "any_file": "false", + "anonymous_function": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Grant(principal="user@domain", action_type="SELECT", catalog="main", database="foo", udf="bar"), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["FUNCTION", "main.foo.bar", "SELECT", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "SELECT", + "catalog": "main", + "database": "foo", + "udf": "bar", + "any_file": "false", + "anonymous_function": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Grant(principal="user@domain", action_type="SELECT", catalog="main", database="foo", udf="bar"), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["FUNCTION", "main.foo.bar", "SELECT", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "SELECT", + "catalog": "main", + "database": "foo", + "udf": "bar", + "any_file": "false", + "anonymous_function": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Grant(principal="user@domain", action_type="ALL_PRIVILEGES", catalog="main", database="foo"), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["DATABASE", "main.foo", "ALL_PRIVILEGES", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "ALL_PRIVILEGES", + "catalog": "main", + "database": "foo", + "any_file": "false", + "anonymous_function": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Grant(principal="user@domain", action_type="ALL_PRIVILEGES", catalog="main"), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["CATALOG", "main", "ALL_PRIVILEGES", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "ALL_PRIVILEGES", + "catalog": "main", + "any_file": "false", + "anonymous_function": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Grant(principal="user@domain", action_type="SELECT", any_file=True), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["ANY FILE", "", "SELECT", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "SELECT", + "any_file": "true", + "anonymous_function": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Grant(principal="user@domain", action_type="SELECT", anonymous_function=True), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["ANONYMOUS FUNCTION", "", "SELECT", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "SELECT", + "any_file": "false", + "anonymous_function": "true", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Grant(principal="user@domain", action_type="ALL_PRIVILEGES", database="foo"), + Row( + workspace_id=2, + job_run_id=1, + object_type="Grant", + object_id=["DATABASE", "hive_metastore.foo", "ALL_PRIVILEGES", "user@domain"], + data={ + "principal": "user@domain", + "action_type": "ALL_PRIVILEGES", + "database": "foo", + "any_file": "false", + "anonymous_function": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ), +) +def test_grant_supports_history(mock_backend, grant_record: Grant, history_record: Row) -> None: + """Verify that Grant records are written to the history log as expected.""" + mock_ownership = create_autospec(GrantOwnership) + mock_ownership.owner_of.return_value = "the_admin" + history_log = HistoryLog[Grant](mock_backend, mock_ownership, Grant, run_id=1, workspace_id=2, catalog="a_catalog") + + history_log.append_inventory_snapshot([grant_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record] diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 79fc692390..f0d7360530 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -7,10 +7,12 @@ import pytest from databricks.labs.lsql.backends import MockBackend, SqlBackend +from databricks.labs.lsql.core import Row from databricks.sdk import WorkspaceClient from databricks.sdk.errors import NotFound from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo, TableInfo +from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.grants import MigrateGrants @@ -37,6 +39,7 @@ What, ) from databricks.labs.ucx.hive_metastore.view_migrate import ViewToMigrate +from databricks.labs.ucx.progress.history import HistoryLog from .. import mock_table_mapping, mock_workspace_client @@ -1539,6 +1542,81 @@ def test_table_migration_status_source_table_unknown() -> None: table_ownership.owner_of.assert_not_called() +@pytest.mark.parametrize( + "table_migration_status_record,history_record", + ( + ( + TableMigrationStatus( + src_schema="foo", + src_table="bar", + dst_catalog="main", + dst_schema="fu", + dst_table="baz", + update_ts="2024-10-18T16:34:00Z", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="TableMigrationStatus", + object_id=["foo", "bar"], + data={ + "src_schema": "foo", + "src_table": "bar", + "dst_catalog": "main", + "dst_schema": "fu", + "dst_table": "baz", + "update_ts": "2024-10-18T16:34:00Z", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + TableMigrationStatus( + src_schema="foo", + src_table="bar", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="TableMigrationStatus", + object_id=["foo", "bar"], + data={ + "src_schema": "foo", + "src_table": "bar", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ), +) +def test_table_migration_status_supports_history( + mock_backend, + table_migration_status_record: TableMigrationStatus, + history_record: Row, +) -> None: + """Verify that TableMigrationStatus records are written as expected to the history log.""" + table_migration_ownership = create_autospec(TableMigrationOwnership) + table_migration_ownership.owner_of.return_value = "the_admin" + history_log = HistoryLog[TableMigrationStatus]( + mock_backend, + table_migration_ownership, + TableMigrationStatus, + run_id=1, + workspace_id=2, + catalog="a_catalog", + ) + + history_log.append_inventory_snapshot([table_migration_status_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record] + + class MockBackendWithGeneralException(MockBackend): """Mock backend that allows raising a general exception. diff --git a/tests/unit/hive_metastore/test_tables.py b/tests/unit/hive_metastore/test_tables.py index 9cad5a003d..03d6686d48 100644 --- a/tests/unit/hive_metastore/test_tables.py +++ b/tests/unit/hive_metastore/test_tables.py @@ -4,8 +4,11 @@ import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.lsql.core import Row +from databricks.labs.ucx.progress.history import HistoryLog from databricks.sdk import WorkspaceClient +from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.hive_metastore.locations import Mount, ExternalLocations, MountsCrawler from databricks.labs.ucx.hive_metastore.tables import ( @@ -679,3 +682,82 @@ def test_table_owner() -> None: assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + + +@pytest.mark.parametrize( + "table_record,history_record", + ( + ( + Table( + catalog="hive_metastore", + database="foo", + name="bar", + object_type="TABLE", + table_format="DELTA", + location="/foo", + storage_properties="[foo=fu,bar=baz]", + is_partitioned=True, + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="Table", + object_id=["hive_metastore", "foo", "bar"], + data={ + "catalog": "hive_metastore", + "database": "foo", + "name": "bar", + "object_type": "TABLE", + "table_format": "DELTA", + "location": "/foo", + "storage_properties": "[foo=fu,bar=baz]", + "is_partitioned": "true", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Table( + catalog="hive_metastore", + database="foo", + name="baz", + object_type="VIEW", + table_format="UNKNOWN", + view_text="select 1", + upgraded_to="main.foo.baz", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="Table", + object_id=["hive_metastore", "foo", "baz"], + data={ + "catalog": "hive_metastore", + "database": "foo", + "name": "baz", + "object_type": "VIEW", + "table_format": "UNKNOWN", + "view_text": "select 1", + "upgraded_to": "main.foo.baz", + "is_partitioned": "false", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ), +) +def test_table_supports_history(mock_backend, table_record: Table, history_record: Row) -> None: + """Verify that Table records are written as expected to the history log.""" + mock_ownership = create_autospec(TableOwnership) + mock_ownership.owner_of.return_value = "the_admin" + history_log = HistoryLog[Table](mock_backend, mock_ownership, Table, run_id=1, workspace_id=2, catalog="a_catalog") + + history_log.append_inventory_snapshot([table_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record] diff --git a/tests/unit/hive_metastore/test_udfs.py b/tests/unit/hive_metastore/test_udfs.py index d1c87d66ad..db6cd97e1a 100644 --- a/tests/unit/hive_metastore/test_udfs.py +++ b/tests/unit/hive_metastore/test_udfs.py @@ -1,9 +1,13 @@ from unittest.mock import create_autospec +import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.lsql.core import Row +from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.hive_metastore.udfs import Udf, UdfsCrawler, UdfOwnership +from databricks.labs.ucx.progress.history import HistoryLog def test_key(): @@ -70,3 +74,96 @@ def test_udf_owner() -> None: assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + + +@pytest.mark.parametrize( + "udf_record,history_record", + ( + ( + Udf( + catalog="hive_metastore", + database="foo", + name="bar", + func_type="UNKNOWN-1", + func_input="UNKNOWN-2", + func_returns="UNKNOWN-3", + deterministic=True, + data_access="UNKNOWN-4", + body="UNKNOWN-5", + comment="UNKNOWN-6", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="Udf", + object_id=["hive_metastore", "foo", "bar"], + data={ + "catalog": "hive_metastore", + "database": "foo", + "name": "bar", + "func_type": "UNKNOWN-1", + "func_input": "UNKNOWN-2", + "func_returns": "UNKNOWN-3", + "deterministic": "true", + "data_access": "UNKNOWN-4", + "body": "UNKNOWN-5", + "comment": "UNKNOWN-6", + "success": "1", + }, + failures=[], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ( + Udf( + catalog="hive_metastore", + database="foo", + name="bar", + func_type="UNKNOWN-1", + func_input="UNKNOWN-2", + func_returns="UNKNOWN-3", + deterministic=True, + data_access="UNKNOWN-4", + body="UNKNOWN-5", + comment="UNKNOWN-6", + success=0, + # Note: NOT json-encoded as is the convention elsewhere. + failures="something_is_wrong_with_this_udf", + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="Udf", + object_id=["hive_metastore", "foo", "bar"], + data={ + "catalog": "hive_metastore", + "database": "foo", + "name": "bar", + "func_type": "UNKNOWN-1", + "func_input": "UNKNOWN-2", + "func_returns": "UNKNOWN-3", + "deterministic": "true", + "data_access": "UNKNOWN-4", + "body": "UNKNOWN-5", + "comment": "UNKNOWN-6", + "success": "0", + }, + failures=["something_is_wrong_with_this_udf"], + owner="the_admin", + ucx_version=ucx_version, + ), + ), + ), +) +def test_udf_supports_history(mock_backend, udf_record: Udf, history_record: Row) -> None: + """Verify that Udf records are written as expected to the history log.""" + mock_ownership = create_autospec(UdfOwnership) + mock_ownership.owner_of.return_value = "the_admin" + history_log = HistoryLog[Udf](mock_backend, mock_ownership, Udf, run_id=1, workspace_id=2, catalog="a_catalog") + + history_log.append_inventory_snapshot([udf_record]) + + rows = mock_backend.rows_written_for("`a_catalog`.`multiworkspace`.`historical`", mode="append") + + assert rows == [history_record] diff --git a/tests/unit/progress/test_history.py b/tests/unit/progress/test_history.py new file mode 100644 index 0000000000..afa2e406d6 --- /dev/null +++ b/tests/unit/progress/test_history.py @@ -0,0 +1,563 @@ +import datetime as dt +import json +import re +from dataclasses import dataclass, field +from enum import Enum +from typing import ClassVar +from unittest.mock import create_autospec + +import pytest +from databricks.labs.lsql.core import Row + +from databricks.labs.ucx.__about__ import __version__ as ucx_version +from databricks.labs.ucx.framework.owners import Ownership +from databricks.labs.ucx.progress.history import HistoricalEncoder, HistoryLog, Record, DataclassWithIdAttributes +from databricks.labs.ucx.progress.install import Historical + + +@dataclass(frozen=True, kw_only=True) +class _TestRecord: + a_field: str + b_field: int + failures: list[str] + + __id_attributes__: ClassVar[tuple[str]] = ("a_field",) + + +@pytest.fixture +def ownership() -> Ownership: + mock_ownership = create_autospec(Ownership) + mock_ownership.owner_of.return_value = "mickey" + return mock_ownership + + +def test_historical_encoder_basic(ownership) -> None: + """Verify basic encoding of a test record into a historical record.""" + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_TestRecord) + + assert ownership.owner_of(_TestRecord(a_field="fu", b_field=2, failures=["doh", "ray"])) == "mickey" + + record = encoder.to_historical(_TestRecord(a_field="fu", b_field=2, failures=["doh", "ray"])) + + expected_record = Historical( + workspace_id=2, + job_run_id=1, + object_type="_TestRecord", + object_id=["fu"], + data={ + "a_field": "fu", + "b_field": "2", + }, + failures=["doh", "ray"], + owner="mickey", + ucx_version=ucx_version, + ) + + assert record == expected_record + + +def test_historical_encoder_workspace_id(ownership) -> None: + """Verify the encoder produces records using the supplied workspace identifier.""" + encoder = HistoricalEncoder(job_run_id=1, workspace_id=52, ownership=ownership, klass=_TestRecord) + + record = _TestRecord(a_field="whatever", b_field=2, failures=[]) + historical = encoder.to_historical(record) + assert historical.workspace_id == 52 + + +def test_historical_encoder_run_id(ownership) -> None: + """Verify the encoder produces records using the supplied job-run identifier.""" + encoder = HistoricalEncoder(job_run_id=42, workspace_id=2, ownership=ownership, klass=_TestRecord) + + record = _TestRecord(a_field="whatever", b_field=2, failures=[]) + historical = encoder.to_historical(record) + assert historical.job_run_id == 42 + + +def test_historical_encoder_ucx_version(ownership) -> None: + """Verify the encoder produces records containing the current UCX version.""" + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_TestRecord) + + record = _TestRecord(a_field="whatever", b_field=2, failures=[]) + historical = encoder.to_historical(record) + assert historical.ucx_version == ucx_version + + +def test_historical_encoder_ownership(ownership) -> None: + """Verify the encoder produces records with the owner determined by the supplied ownership instance.""" + expected_owners = ("bob", "jane", "tarzan") + ownership.owner_of.side_effect = expected_owners + + records = [_TestRecord(a_field="whatever", b_field=x, failures=[]) for x in range(len(expected_owners))] + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_TestRecord) + + encoded_records = [encoder.to_historical(record) for record in records] + owners = tuple(encoded_record.owner for encoded_record in encoded_records) + + assert owners == expected_owners + assert ownership.owner_of.call_count == 3 + + +def test_historical_encoder_object_type(ownership) -> None: + """Verify the encoder uses the name of the record type as the object type for records.""" + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_TestRecord) + + record = _TestRecord(a_field="whatever", b_field=2, failures=[]) + historical = encoder.to_historical(record) + assert historical.object_type == "_TestRecord" + + +def test_historical_encoder_object_id(ownership) -> None: + """Verify the encoder uses the configured object-id fields from the record type in the encoded records.""" + encoder1 = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_TestRecord) + + historical1 = encoder1.to_historical(_TestRecord(a_field="used_for_key", b_field=2, failures=[])) + assert historical1.object_id == ["used_for_key"] + + @dataclass + class _CompoundKey: + a_field: str = "field-a" + b_field: str = "field-b" + c_field: str = "field-c" + + @property + def d_property(self) -> str: + return "property-d" + + __id_attributes__: ClassVar = ("a_field", "c_field", "b_field", "d_property") + + encoder2 = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_CompoundKey) + historical2 = encoder2.to_historical(_CompoundKey()) + + # Note: order matters + assert historical2.object_id == ["field-a", "field-c", "field-b", "property-d"] + + +def test_historical_encoder_object_id_verification_no_id(ownership) -> None: + """Check that during initialization we fail if there is no __id_attributes__ defined.""" + + @dataclass + class _NoId: + pass + + with pytest.raises(AttributeError) as excinfo: + HistoricalEncoder(job_run_id=1, workspace_id=1, ownership=ownership, klass=_NoId) + + assert excinfo.value.obj == _NoId + assert excinfo.value.name == "__id_attributes__" + + +@dataclass +class _WrongTypeIdFields: + ok: str + not_ok: int + + __id_attributes__: ClassVar = ["ok", "not_ok"] + + +@dataclass +class _WrongTypeIdProperty: + ok: str + + @property + def not_ok(self) -> int: + return 0 + + __id_attributes__: ClassVar = ["ok", "not_ok"] + + +@pytest.mark.parametrize("wrong_id_type_class", (_WrongTypeIdFields, _WrongTypeIdProperty)) +def test_historical_encoder_object_id_verification_wrong_type(ownership, wrong_id_type_class: type[Record]) -> None: + """Check that during initialization we fail if the id attributes are declared but are not strings.""" + + expected_msg = r"^Historical record has a non-string id attribute: not_ok \(type=\)$" + with pytest.raises(TypeError, match=expected_msg): + HistoricalEncoder(job_run_id=1, workspace_id=1, ownership=ownership, klass=wrong_id_type_class) + + +def test_historical_encoder_object_id_verification_no_property_type(ownership) -> None: + """Check that during initialization we fail if the id attributes are declared but are not strings.""" + + @dataclass + class _NoTypeIdProperty: + ok: str + + @property + def not_ok(self): + return 0 + + __id_attributes__: ClassVar = ["ok", "not_ok"] + + expected_msg = "^Historical record has a property with no type as an id attribute: not_ok$" + with pytest.raises(TypeError, match=expected_msg): + HistoricalEncoder(job_run_id=1, workspace_id=1, ownership=ownership, klass=_NoTypeIdProperty) + + +def test_historical_encoder_object_id_verification_non_readable_property(ownership) -> None: + """Check that during initialization we fail if an id attribute refers to a non-readable property.""" + + @dataclass + class _NonReadableIdProperty: + ok: str + + __id_attributes__: ClassVar = ["ok", "not_ok"] + + # Has to be injected after class declaration to avoid being treated as a field. + _NonReadableIdProperty.not_ok = property(doc="A non-readable-property") # type: ignore[attr-defined] + + expected_msg = r"^Historical record has a non-readable property as an id attribute: not_ok$" + with pytest.raises(TypeError, match=expected_msg): + HistoricalEncoder(job_run_id=1, workspace_id=1, ownership=ownership, klass=_NonReadableIdProperty) + + +def test_historical_encoder_object_id_verification_missing_attribute(ownership) -> None: + """Check that during initialization we fail if an id attribute refers to an attribute that does not exist.""" + + @dataclass + class _MissingAttribute: + ok: str + + __id_attributes__: ClassVar = ["ok", "not_ok"] + + with pytest.raises(AttributeError) as excinfo: + HistoricalEncoder(job_run_id=1, workspace_id=1, ownership=ownership, klass=_MissingAttribute) + + assert excinfo.value.obj == _MissingAttribute + assert excinfo.value.name == "not_ok" + + +def test_historical_encoder_object_id_verification_not_field_or_property(ownership) -> None: + """Check that during initialization we fail if an id attribute refers an attribute that isn't a field or property.""" + + @dataclass + class _NotFieldOrProperty: + ok: str + + def not_ok(self) -> str: + return "" + + __id_attributes__: ClassVar = ["ok", "not_ok"] + + expected_msg = r"^Historical record declares an id attribute that is not a field or property: not_ok \(type=<.*>\)$" + with pytest.raises(TypeError, match=expected_msg): + HistoricalEncoder(job_run_id=1, workspace_id=1, ownership=ownership, klass=_NotFieldOrProperty) + + +def test_historical_encoder_object_data(ownership) -> None: + """Verify the encoder includes all dataclass fields in the object data.""" + encoder1 = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_TestRecord) + + historical1 = encoder1.to_historical(_TestRecord(a_field="used_for_key", b_field=2, failures=[])) + assert set(historical1.data.keys()) == {"a_field", "b_field"} + + @dataclass + class _AnotherClass: + field_1: str = "foo" + field_2: str = "bar" + field_3: str = "baz" + field_4: str = "daz" + + __id_attributes__: ClassVar = ("field_1",) + + encoder2 = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_AnotherClass) + historical2 = encoder2.to_historical(_AnotherClass()) + assert set(historical2.data.keys()) == {"field_1", "field_2", "field_3", "field_4"} + + +def test_historical_encoder_object_data_values_strings_as_is(ownership) -> None: + """Verify that string fields are encoded as-is in the object_data""" + + @dataclass + class _AClass: + a_field: str = "value" + existing_json_field: str = "[1, 2, 3]" + optional_string_field: str | None = "value" + + __id_attributes__: ClassVar = ("a_field",) + + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_AClass) + historical = encoder.to_historical(_AClass()) + assert historical.data == {"a_field": "value", "existing_json_field": "[1, 2, 3]", "optional_string_field": "value"} + + +def test_historical_encoder_object_data_missing_optional_values(ownership) -> None: + """Verify the encoding of missing (optional) field values.""" + + @dataclass(frozen=True) + class _InnerClass: + optional_field: str | None = None + + @dataclass + class _AClass: + a_field: str = "value" + optional_field: str | None = None + nested: _InnerClass = _InnerClass() + + __id_attributes__: ClassVar = ("a_field",) + + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_AClass) + historical = encoder.to_historical(_AClass()) + assert "optional_field" not in historical.data, "First-level optional fields should be elided if None" + assert historical.data["nested"] == '{"optional_field":null}', "Nested optional fields should be encoded as nulls" + + +def test_historical_encoder_object_data_values_non_strings_as_json(ownership) -> None: + """Verify that non-string fields are encoded as JSON in the object_data""" + + @dataclass(frozen=True) + class _InnerClass: + counter: int + boolean: bool = True + a_field: str = "bar" + optional: str | None = None + + class _Suit(Enum): + HEARTS = 1 + DIAMONDS = 2 + CLUBS = 3 + SPADES = 4 + + @dataclass + class _AClass: + str_field: str = "foo" + int_field: int = 23 + bool_field: bool = True + float_field: float = 2.3 + enum_field: _Suit = _Suit.HEARTS + date_field: dt.date = field(default_factory=lambda: dt.date(year=2024, month=10, day=15)) + ts_field: dt.datetime = field( + default_factory=lambda: dt.datetime( + year=2024, month=10, day=15, hour=12, minute=44, second=16, tzinfo=dt.timezone.utc + ) + ) + array_field: list[str] = field(default_factory=lambda: ["foo", "bar", "baz"]) + set_field: set[str] = field(default_factory=lambda: {"fu", "baa", "boz"}) + dict_field: dict[int, str] = field(default_factory=lambda: {1000: "M", 100: "C"}) + nested_dataclass: list[_InnerClass] = field(default_factory=lambda: [_InnerClass(x) for x in range(2)]) + + __id_attributes__: ClassVar = ("str_field",) + + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_AClass) + historical = encoder.to_historical(_AClass()) + # Python set iteration doesn't preserve order, so we need to check set_field separately. + set_field = historical.data.pop("set_field") + assert historical.data == { + "str_field": "foo", + "int_field": "23", + "bool_field": "true", + "float_field": "2.3", + "enum_field": "HEARTS", + "date_field": "2024-10-15", + "ts_field": "2024-10-15T12:44:16Z", + "array_field": '["foo","bar","baz"]', + "dict_field": '{"1000":"M","100":"C"}', + "nested_dataclass": '[{"counter":0,"boolean":true,"a_field":"bar","optional":null},{"counter":1,"boolean":true,"a_field":"bar","optional":null}]', + } + decoded_set_field = json.loads(set_field) + assert isinstance(decoded_set_field, list) and len(decoded_set_field) == 3 + assert set(decoded_set_field) == {"fu", "baa", "boz"} + + +def test_historical_encoder_object_data_imposter_string_values(ownership) -> None: + """Verify that string fields containing non-string values are handled as an error.""" + + @dataclass + class _AClass: + a_field: str = "value" + the_string_field: str | None = None + + __id_attributes__: ClassVar = ("a_field",) + + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_AClass) + record_with_imposter = _AClass(the_string_field=2) # type: ignore[arg-type] + with pytest.raises(ValueError, match=r"^Invalid value for field the_string_field, not a string: 2$"): + _ = encoder.to_historical(record_with_imposter) + + +@dataclass(frozen=True, kw_only=True) +class _InnerClassWithTimestamp: + b_field: dt.datetime + + +@dataclass(frozen=True, kw_only=True) +class _OuterclassWithTimestamps: + object_id: str = "not used" + a_field: dt.datetime | None = None + inner: _InnerClassWithTimestamp | None = None + + __id_attributes__: ClassVar = ("object_id",) + + +@pytest.mark.parametrize( + "field_name,record", + ( + ("a_field", _OuterclassWithTimestamps(a_field=dt.datetime.now())), + ("inner", _OuterclassWithTimestamps(inner=_InnerClassWithTimestamp(b_field=dt.datetime.now()))), + ), +) +def test_historical_encoder_naive_timestamps_banned(ownership, field_name, record: _OuterclassWithTimestamps) -> None: + """Verify that encoding detects and disallows naive timestamps.""" + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_OuterclassWithTimestamps) + + expected_msg = f"Timestamp without timezone not supported in or within field {field_name}" + with pytest.raises(ValueError, match=f"^{re.escape(expected_msg)}"): + _ = encoder.to_historical(record) + + +@dataclass(frozen=True, kw_only=True) +class _InnerClassWithUnserializable: + b_field: object + + +@dataclass(frozen=True, kw_only=True) +class _OuterclassWithUnserializable: + object_id: str = "not used" + a_field: object | None = None + inner: _InnerClassWithUnserializable | None = None + + __id_attributes__: ClassVar = ("object_id",) + + +@pytest.mark.parametrize( + "field_name,record", + ( + ("a_field", _OuterclassWithUnserializable(a_field=object())), + ("inner", _OuterclassWithUnserializable(inner=_InnerClassWithUnserializable(b_field=object()))), + ), +) +def test_historical_encoder_unserializable_values(ownership, field_name, record: _OuterclassWithUnserializable) -> None: + """Verify that encoding catches and handles unserializable values.""" + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_OuterclassWithUnserializable) + + expected_msg = f"^Cannot encode .* value in or within field {re.escape(field_name)}: " + with pytest.raises(TypeError, match=expected_msg): + _ = encoder.to_historical(record) + + +@pytest.mark.parametrize("failures", (["failures-1", "failures-2"], [])) +def test_historical_encoder_failures_list(ownership, failures: list[str]) -> None: + """Verify an encoder places a failures list on the top-level field instead of within the object data.""" + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_TestRecord) + + historical = encoder.to_historical(_TestRecord(a_field="foo", b_field=10, failures=list(failures))) + + assert historical.failures == failures + assert "failures" not in historical.data + + +@pytest.mark.parametrize("failures", ('["failures-1", "failures-2"]', '[]', '')) +def test_historical_encoder_json_encoded_failures_list(ownership, failures: str) -> None: + """Verify an encoder places a pre-encoded JSON list of failures on the top-level field instead of within the object data.""" + + @dataclass + class _FailuresEncodedJson: + failures: str + an_id: str = "the_id" + + __id_attributes__: ClassVar = ("an_id",) + + encoder = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=_FailuresEncodedJson) + + historical = encoder.to_historical(_FailuresEncodedJson(failures=failures)) + + expected_failures = json.loads(failures) if failures else [] + assert historical.failures == expected_failures + assert "failures" not in historical.data + + +@dataclass +class _BrokenFailures1: + a_field: str = "a_field" + failures: list[int] = field(default_factory=list) + + __id_attributes__: ClassVar = ("a_field",) + + +@dataclass +class _BrokenFailures2: + a_field: str = "a_field" + failures: None = None + + __id_attributes__: ClassVar = ("a_field",) + + +@pytest.mark.parametrize("klass,broken_type", ((_BrokenFailures1, list[int]), (_BrokenFailures2, None))) +def test_historical_encoder_failures_verification( + ownership, + klass: type[DataclassWithIdAttributes], + broken_type: type, +) -> None: + """Verify that encoders checks the failures field type during initialization.""" + + expected_msg = f"^Historical record {re.escape(str(klass))} has invalid 'failures' attribute of type: {re.escape(str(broken_type))}$" + with pytest.raises(TypeError, match=expected_msg): + _ = HistoricalEncoder(job_run_id=1, workspace_id=2, ownership=ownership, klass=klass) + + +def test_history_log_appends_historical_records(mock_backend, ownership) -> None: + """Verify that we can journal a snapshot of records to the historical log.""" + ownership.owner_of.side_effect = lambda o: f"owner-{o.a_field}" + + records = ( + _TestRecord(a_field="first_record", b_field=1, failures=[]), + _TestRecord(a_field="second_record", b_field=2, failures=["a_failure"]), + _TestRecord(a_field="third_record", b_field=3, failures=["another_failure", "yet_another_failure"]), + ) + expected_historical_entries = ( + Row( + workspace_id=2, + job_run_id=1, + object_type="_TestRecord", + object_id=["first_record"], + data={"a_field": "first_record", "b_field": "1"}, + failures=[], + owner="owner-first_record", + ucx_version=ucx_version, + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="_TestRecord", + object_id=["second_record"], + data={"a_field": "second_record", "b_field": "2"}, + failures=["a_failure"], + owner="owner-second_record", + ucx_version=ucx_version, + ), + Row( + workspace_id=2, + job_run_id=1, + object_type="_TestRecord", + object_id=["third_record"], + data={"a_field": "third_record", "b_field": "3"}, + failures=["another_failure", "yet_another_failure"], + owner="owner-third_record", + ucx_version=ucx_version, + ), + ) + + history_log = HistoryLog( + mock_backend, + ownership, + _TestRecord, + run_id=1, + workspace_id=2, + catalog="the_catalog", + schema="the_schema", + table="the_table", + ) + history_log.append_inventory_snapshot(records) + + rows_appended = mock_backend.rows_written_for("`the_catalog`.`the_schema`.`the_table`", mode="append") + assert rows_appended == list(expected_historical_entries) + + +def test_history_log_default_location(mock_backend, ownership) -> None: + """Verify that the history log defaults to the ucx.history in the configured catalog.""" + + record = _TestRecord(a_field="foo", b_field=1, failures=[]) + history_log = HistoryLog(mock_backend, ownership, _TestRecord, run_id=1, workspace_id=2, catalog="the_catalog") + history_log.append_inventory_snapshot([record]) + + assert history_log.full_name == "the_catalog.multiworkspace.historical" + assert mock_backend.has_rows_written_for("`the_catalog`.`multiworkspace`.`historical`") diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 348f99d835..555eb49a04 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -1,7 +1,10 @@ +import datetime as dt from typing import get_type_hints from unittest.mock import create_autospec import pytest +from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.progress.history import HistoryLog from databricks.sdk import WorkspaceClient from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState @@ -11,24 +14,61 @@ @pytest.mark.parametrize( - "task, crawler", + "task, crawler, history_log", ( - (MigrationProgress.crawl_tables, RuntimeContext.tables_crawler), - (MigrationProgress.crawl_udfs, RuntimeContext.udfs_crawler), - (MigrationProgress.crawl_grants, RuntimeContext.grants_crawler), - (MigrationProgress.assess_jobs, RuntimeContext.jobs_crawler), - (MigrationProgress.assess_clusters, RuntimeContext.clusters_crawler), - (MigrationProgress.assess_pipelines, RuntimeContext.pipelines_crawler), - (MigrationProgress.crawl_cluster_policies, RuntimeContext.policies_crawler), - (MigrationProgress.refresh_table_migration_status, RuntimeContext.migration_status_refresher), + (MigrationProgress.crawl_udfs, RuntimeContext.udfs_crawler, RuntimeContext.historical_udfs_log), + (MigrationProgress.crawl_grants, RuntimeContext.grants_crawler, RuntimeContext.historical_grants_log), + (MigrationProgress.assess_jobs, RuntimeContext.jobs_crawler, RuntimeContext.historical_jobs_log), + (MigrationProgress.assess_clusters, RuntimeContext.clusters_crawler, RuntimeContext.historical_clusters_log), + (MigrationProgress.assess_pipelines, RuntimeContext.pipelines_crawler, RuntimeContext.historical_pipelines_log), + ( + MigrationProgress.crawl_cluster_policies, + RuntimeContext.policies_crawler, + RuntimeContext.historical_cluster_policies_log, + ), + ( + MigrationProgress.refresh_table_migration_status, + RuntimeContext.migration_status_refresher, + RuntimeContext.historical_table_migration_log, + ), ), ) -def test_migration_progress_runtime_refresh(run_workflow, task, crawler) -> None: +def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history_log) -> None: crawler_class = get_type_hints(crawler.func)["return"] mock_crawler = create_autospec(crawler_class) + mock_history_log = create_autospec(HistoryLog) crawler_name = crawler.attrname - run_workflow(task, **{crawler_name: mock_crawler}) + history_log_name = history_log.attrname + context_replacements = { + crawler_name: mock_crawler, + history_log_name: mock_history_log, + "named_parameters": {"parent_run_id": 53}, + } + run_workflow(task, **context_replacements) mock_crawler.snapshot.assert_called_once_with(force_refresh=True) + mock_history_log.append_inventory_snapshot.assert_called_once() + + +def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: + """Ensure that the split crawl and update-history-log tasks perform their part of the refresh process.""" + mock_tables_crawler = create_autospec(TablesCrawler) + mock_history_log = create_autospec(HistoryLog) + context_replacements = { + "tables_crawler": mock_tables_crawler, + "historical_tables_log": mock_history_log, + "named_parameters": {"parent_run_id": 53}, + } + + # The first part of a 2-step update: the crawl without updating the history log. + run_workflow(MigrationProgress.crawl_tables, **context_replacements) + mock_tables_crawler.snapshot.assert_called_once_with(force_refresh=True) + mock_history_log.append_inventory_snapshot.assert_not_called() + + mock_tables_crawler.snapshot.reset_mock() + # The second part of the 2-step update: updating the history log (without a forced crawl). + run_workflow(MigrationProgress.update_tables_history_log, **context_replacements) + mock_tables_crawler.snapshot.assert_called_once_with() + mock_history_log.append_inventory_snapshot.assert_called_once() @pytest.mark.parametrize( @@ -67,3 +107,38 @@ def test_migration_progress_with_invalid_prerequisites(run_workflow) -> None: task = MigrationProgress.verify_prerequisites with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace."): run_workflow(task, workspace_client=ws) + + +def test_migration_progress_record_workflow_run(run_workflow, mock_backend) -> None: + """Verify that we log the workflow run.""" + task = MigrationProgress.record_workflow_run + start_time = dt.datetime.now(dt.timezone.utc).replace(microsecond=0) + context_replacements = { + "sql_backend": mock_backend, + "named_parameters": { + "workflow": "test", + "job_id": "123456", + "parent_run_id": "456", + "attempt": "0", + "start_time": start_time.isoformat(), + }, + } + + run_workflow(task, **context_replacements) + + rows = mock_backend.rows_written_for("ucx.multiworkspace.workflow_runs", "append") + + rows_as_dict = [{k: v for k, v in rows.asDict().items() if k != 'finished_at'} for rows in rows] + assert rows_as_dict == [ + { + "started_at": start_time, + # finished_at: checked below. + "workspace_id": 123, + "workflow_name": "test", + "workflow_id": 123456, + "workflow_run_id": 456, + "workflow_run_attempt": 0, + } + ] + # Finish-time must be indistinguishable from or later than the start time. + assert all(row["started_at"] <= row["finished_at"] for row in rows) diff --git a/tests/unit/source_code/test_directfs_access.py b/tests/unit/source_code/test_directfs_access.py index 2d1460abdb..c00c1cdcc6 100644 --- a/tests/unit/source_code/test_directfs_access.py +++ b/tests/unit/source_code/test_directfs_access.py @@ -1,4 +1,4 @@ -from datetime import datetime +import datetime as dt from unittest.mock import create_autospec from databricks.labs.lsql.backends import MockBackend @@ -17,16 +17,17 @@ def test_crawler_appends_dfsas() -> None: crawler = DirectFsAccessCrawler.for_paths(backend, "schema") existing = list(crawler.snapshot()) assert not existing + now = dt.datetime.now(tz=dt.timezone.utc) dfsas = list( DirectFsAccess( path=path, is_read=False, is_write=False, source_id="ID", - source_timestamp=datetime.now(), + source_timestamp=now, source_lineage=[LineageAtom(object_type="LINEAGE", object_id="ID")], - assessment_start_timestamp=datetime.now(), - assessment_end_timestamp=datetime.now(), + assessment_start_timestamp=now, + assessment_end_timestamp=now, ) for path in ("a", "b", "c") ) diff --git a/tests/unit/source_code/test_used_table.py b/tests/unit/source_code/test_used_table.py index 2ced362ef5..e432a923a8 100644 --- a/tests/unit/source_code/test_used_table.py +++ b/tests/unit/source_code/test_used_table.py @@ -1,4 +1,4 @@ -from datetime import datetime +import datetime as dt from databricks.labs.lsql.backends import MockBackend @@ -11,15 +11,16 @@ def test_crawler_appends_tables() -> None: crawler = UsedTablesCrawler.for_paths(backend, "schema") existing = list(crawler.snapshot()) assert not existing + now = dt.datetime.now(tz=dt.timezone.utc) dfsas = list( UsedTable( catalog_name="catalog", schema_name="schema", table_name=name, - source_timestamp=datetime.now(), + source_timestamp=now, source_lineage=[LineageAtom(object_type="LINEAGE", object_id="ID")], - assessment_start_timestamp=datetime.now(), - assessment_end_timestamp=datetime.now(), + assessment_start_timestamp=now, + assessment_end_timestamp=now, ) for name in ("a", "b", "c") )