diff --git a/src/databricks/labs/ucx/assessment/clusters.py b/src/databricks/labs/ucx/assessment/clusters.py index 02badb64ec..0e0624d3c2 100644 --- a/src/databricks/labs/ucx/assessment/clusters.py +++ b/src/databricks/labs/ucx/assessment/clusters.py @@ -29,6 +29,7 @@ ) from databricks.labs.ucx.assessment.init_scripts import CheckInitScriptMixin from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier logger = logging.getLogger(__name__) @@ -43,6 +44,7 @@ class ClusterInfo: policy_id: str | None = None cluster_name: str | None = None creator: str | None = None + """User-name of the creator of the cluster, if known.""" class CheckClusterMixin(CheckInitScriptMixin): @@ -154,7 +156,8 @@ def _assess_clusters(self, all_clusters): for cluster in all_clusters: if cluster.cluster_source == ClusterSource.JOB: continue - if not cluster.creator_user_name: + creator = cluster.creator_user_name or None + if not creator: logger.warning( f"Cluster {cluster.cluster_id} have Unknown creator, it means that the original creator " f"has been deleted and should be re-created" @@ -164,7 +167,7 @@ def _assess_clusters(self, all_clusters): cluster_name=cluster.cluster_name, policy_id=cluster.policy_id, spark_version=cluster.spark_version, - creator=cluster.creator_user_name, + creator=creator, success=1, failures="[]", ) @@ -179,6 +182,16 @@ def _try_fetch(self) -> Iterable[ClusterInfo]: yield ClusterInfo(*row) +class ClusterOwnership(Ownership[ClusterInfo]): + """Determine ownership of clusters in the inventory. + + This is the cluster creator (if known). + """ + + def _maybe_direct_owner(self, record: ClusterInfo) -> str | None: + return record.creator + + @dataclass class PolicyInfo: policy_id: str @@ -188,6 +201,7 @@ class PolicyInfo: spark_version: str | None = None policy_description: str | None = None creator: str | None = None + """User-name of the creator of the cluster policy, if known.""" class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin): @@ -210,7 +224,7 @@ def _assess_policies(self, all_policices) -> Iterable[PolicyInfo]: except KeyError: spark_version = None policy_name = policy.name - creator_name = policy.creator_user_name + creator_name = policy.creator_user_name or None policy_info = PolicyInfo( policy_id=policy.policy_id, @@ -229,3 +243,13 @@ def _assess_policies(self, all_policices) -> Iterable[PolicyInfo]: def _try_fetch(self) -> Iterable[PolicyInfo]: for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): yield PolicyInfo(*row) + + +class ClusterPolicyOwnership(Ownership[PolicyInfo]): + """Determine ownership of cluster policies in the inventory. + + This is the creator of the cluster policy (if known). + """ + + def _maybe_direct_owner(self, record: PolicyInfo) -> str | None: + return record.creator diff --git a/src/databricks/labs/ucx/assessment/jobs.py b/src/databricks/labs/ucx/assessment/jobs.py index d5b77d68e0..3c6a4afa84 100644 --- a/src/databricks/labs/ucx/assessment/jobs.py +++ b/src/databricks/labs/ucx/assessment/jobs.py @@ -25,6 +25,7 @@ from databricks.labs.ucx.assessment.clusters import CheckClusterMixin from databricks.labs.ucx.assessment.crawlers import spark_version_compatibility from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier logger = logging.getLogger(__name__) @@ -37,6 +38,7 @@ class JobInfo: failures: str job_name: str | None = None creator: str | None = None + """User-name of the creator of the pipeline, if known.""" class JobsMixin: @@ -106,7 +108,8 @@ def _prepare(all_jobs) -> tuple[dict[int, set[str]], dict[int, JobInfo]]: if not job.job_id: continue job_assessment[job.job_id] = set() - if not job.creator_user_name: + creator_user_name = job.creator_user_name or None + if not creator_user_name: logger.warning( f"Job {job.job_id} have Unknown creator, it means that the original creator has been deleted " f"and should be re-created" @@ -122,7 +125,7 @@ def _prepare(all_jobs) -> tuple[dict[int, set[str]], dict[int, JobInfo]]: job_details[job.job_id] = JobInfo( job_id=str(job.job_id), job_name=job_name, - creator=job.creator_user_name, + creator=creator_user_name, success=1, failures="[]", ) @@ -140,6 +143,16 @@ def _check_jar_task(self, all_task: list[RunTask]) -> list[str]: return task_failures +class JobOwnership(Ownership[JobInfo]): + """Determine ownership of jobs (workflows) in the inventory. + + This is the job creator (if known). + """ + + def _maybe_direct_owner(self, record: JobInfo) -> str | None: + return record.creator + + @dataclass class SubmitRunInfo: run_ids: str # JSON-encoded list of run ids diff --git a/src/databricks/labs/ucx/assessment/pipelines.py b/src/databricks/labs/ucx/assessment/pipelines.py index 8421e53084..19bc8c558b 100644 --- a/src/databricks/labs/ucx/assessment/pipelines.py +++ b/src/databricks/labs/ucx/assessment/pipelines.py @@ -8,6 +8,7 @@ from databricks.labs.ucx.assessment.clusters import CheckClusterMixin from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier logger = logging.getLogger(__name__) @@ -20,6 +21,7 @@ class PipelineInfo: failures: str pipeline_name: str | None = None creator_name: str | None = None + """User-name of the creator of the pipeline, if known.""" class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin): @@ -33,7 +35,8 @@ def _crawl(self) -> Iterable[PipelineInfo]: def _assess_pipelines(self, all_pipelines) -> Iterable[PipelineInfo]: for pipeline in all_pipelines: - if not pipeline.creator_user_name: + creator_name = pipeline.creator_user_name or None + if not creator_name: logger.warning( f"Pipeline {pipeline.name} have Unknown creator, it means that the original creator " f"has been deleted and should be re-created" @@ -41,7 +44,7 @@ def _assess_pipelines(self, all_pipelines) -> Iterable[PipelineInfo]: pipeline_info = PipelineInfo( pipeline_id=pipeline.pipeline_id, pipeline_name=pipeline.name, - creator_name=pipeline.creator_user_name, + creator_name=creator_name, success=1, failures="[]", ) @@ -73,3 +76,13 @@ def _pipeline_clusters(self, clusters, failures): def _try_fetch(self) -> Iterable[PipelineInfo]: for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): yield PipelineInfo(*row) + + +class PipelineOwnership(Ownership[PipelineInfo]): + """Determine ownership of pipelines in the inventory. + + This is the pipeline creator (if known). + """ + + def _maybe_direct_owner(self, record: PipelineInfo) -> str | None: + return record.creator_name diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 9bee3e3537..c1e335734e 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -28,6 +28,7 @@ from databricks.labs.ucx.assessment.export import AssessmentExporter from databricks.labs.ucx.aws.credentials import CredentialManager from databricks.labs.ucx.config import WorkspaceConfig +from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.hive_metastore import ExternalLocations, Mounts, TablesCrawler from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema from databricks.labs.ucx.hive_metastore.grants import ( @@ -514,6 +515,10 @@ def migration_recon(self) -> MigrationRecon: self.config.recon_tolerance_percent, ) + @cached_property + def administrator_locator(self) -> AdministratorLocator: + return AdministratorLocator(self.workspace_client) + class CliContext(GlobalContext, abc.ABC): @cached_property diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 30440e2bfe..ea1aa07e3b 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -73,7 +73,7 @@ def pipelines_crawler(self) -> PipelinesCrawler: @cached_property def table_size_crawler(self) -> TableSizeCrawler: - return TableSizeCrawler(self.sql_backend, self.inventory_database, self.config.include_databases) + return TableSizeCrawler(self.tables_crawler) @cached_property def policies_crawler(self) -> PoliciesCrawler: diff --git a/src/databricks/labs/ucx/framework/crawlers.py b/src/databricks/labs/ucx/framework/crawlers.py index 48d774d403..4c89cde902 100644 --- a/src/databricks/labs/ucx/framework/crawlers.py +++ b/src/databricks/labs/ucx/framework/crawlers.py @@ -21,7 +21,7 @@ class DataclassInstance(Protocol): class CrawlerBase(ABC, Generic[Result]): - def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]): + def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]) -> None: """ Initializes a CrawlerBase instance. diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py new file mode 100644 index 0000000000..4edef7a5e8 --- /dev/null +++ b/src/databricks/labs/ucx/framework/owners.py @@ -0,0 +1,196 @@ +import logging +from abc import ABC, abstractmethod +from collections.abc import Callable, Iterable, Sequence +from functools import cached_property +from typing import ClassVar, Generic, Protocol, TypeVar, final + +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import NotFound +from databricks.sdk.service.iam import User + +logger = logging.getLogger(__name__) + + +class DataclassInstance(Protocol): + __dataclass_fields__: ClassVar[dict] + + +Record = TypeVar("Record") + + +class AdministratorFinder(ABC): + def __init__(self, ws: WorkspaceClient) -> None: + self._ws = ws + + @abstractmethod + def find_admin_users(self) -> Iterable[User]: + """Locate active admin users.""" + raise NotImplementedError() + + +class WorkspaceAdministratorFinder(AdministratorFinder): + """Locate the users that are in the 'admin' workspace group for a given workspace.""" + + @staticmethod + def _member_of_group_named(user: User, group_name: str) -> bool: + """Determine whether a user belongs to a group with the given name or not.""" + return user.groups is not None and any(g.display == group_name for g in user.groups) + + @staticmethod + def _member_of_group(user: User, group_id: str) -> bool: + """Determine whether a user belongs to a group with the given identifier or not.""" + return user.groups is not None and any(g.value == group_id for g in user.groups) + + def _is_active_admin(self, user: User) -> bool: + """Determine if a user is an active administrator.""" + return bool(user.active) and self._member_of_group_named(user, "admins") + + def _is_workspace_group(self, group_id: str) -> bool: + """Determine whether a group_id corresponds to a workspace group or not.""" + try: + group = self._ws.groups.get(group_id) + except NotFound: + return False + return bool(group.meta and group.meta.resource_type == "WorkspaceGroup") + + def find_admin_users(self) -> Iterable[User]: + """Enumerate the active workspace administrators in a given workspace. + + Returns: + Iterable[User]: The active workspace administrators, if any. + """ + logger.debug("Enumerating users to locate active workspace administrators...") + all_users = self._ws.users.list(attributes="id,active,userName,groups") + # The groups attribute is a flattened list of groups a user belongs to; hunt for the 'admins' workspace group. + # Reference: https://learn.microsoft.com/en-us/azure/databricks/admin/users-groups/groups#account-vs-workspace-group + admin_users = [user for user in all_users if user.user_name and self._is_active_admin(user)] + logger.debug(f"Verifying membership of the 'admins' workspace group for users: {admin_users}") + maybe_admins_id = set() + for user in admin_users: + if not user.groups: + continue + for group in user.groups: + if group.display == "admins" and group.value: + maybe_admins_id.add(group.value) + # There can only be a single 'admins' workspace group. + for group_id in maybe_admins_id: + if self._is_workspace_group(group_id): + return (user for user in admin_users if self._member_of_group(user, group_id)) + return () + + +class AccountAdministratorFinder(AdministratorFinder): + """Locate the users that are account administrators for this workspace.""" + + @staticmethod + def _has_role(user: User, role: str) -> bool: + """Determine whether a user has a given role or not.""" + return user.roles is not None and any(r.value == role for r in user.roles) + + def find_admin_users(self) -> Iterable[User]: + """Enumerate the active account administrators associated with a given workspace. + + Returns: + Iterable[User]: The active account administrators, if any. + """ + logger.debug("Enumerating account users to locate active administrators...") + response = self._ws.api_client.do( + "GET", "/api/2.0/account/scim/v2/Users", query={"attributes": "id,active,userName,roles"} + ) + assert isinstance(response, dict) + all_users = (User.from_dict(resource) for resource in response.get("Resources", [])) + # Reference: https://learn.microsoft.com/en-us/azure/databricks/admin/users-groups/groups#account-admin + return (user for user in all_users if user.active and user.user_name and self._has_role(user, "account_admin")) + + +class AdministratorLocator: + """Locate a workspace administrator, if possible. + + This will first try to find an active workspace administrator. If there are multiple, the first (alphabetically + sorted by user-name) will be used. If no active workspace administrators can be found then an account administrator + is sought, again returning the first alphabetically by user-name if more than one is found. + """ + + def __init__( + self, + ws: WorkspaceClient, + *, + finders: Sequence[Callable[[WorkspaceClient], AdministratorFinder]] = ( + WorkspaceAdministratorFinder, + AccountAdministratorFinder, + ), + ) -> None: + """ + Initialize the instance, which will try to locate administrators using the workspace for the supplied client. + + Args: + ws (WorkspaceClient): the client for workspace in which to locate admin users. + finders: a sequence of factories that will be instantiated on demand to locate admin users. + """ + self._ws = ws + self._finders = finders + + @cached_property + def _workspace_id(self) -> int: + # Makes a REST call, so we cache it. + return self._ws.get_workspace_id() + + @cached_property + def _found_admin(self) -> str | None: + + # Ordering helper: User.user_name is typed as optional but we can't sort by None. + # (The finders already filter out users without a user-name.) + def _by_username(user: User) -> str: + assert user.user_name + return user.user_name + + # Lazily instantiate and query the finders in an attempt to locate an admin user. + for factory in self._finders: + finder = factory(self._ws) + # First alphabetically by name. + admin_user = min(finder.find_admin_users(), default=None, key=_by_username) + if admin_user: + return admin_user.user_name + return None + + def get_workspace_administrator(self) -> str: + """The user-name of an admin user for the workspace. + + Raises: + RuntimeError if an admin user cannot be found in the current workspace. + """ + found_admin = self._found_admin + if found_admin is None: + msg = f"No active workspace or account administrator can be found for workspace: {self._workspace_id}" + raise RuntimeError(msg) + return found_admin + + +class Ownership(ABC, Generic[Record]): + """Determine an owner for a given type of object.""" + + def __init__(self, administrator_locator: AdministratorLocator) -> None: + self._administrator_locator = administrator_locator + + @final + def owner_of(self, record: Record) -> str: + """Obtain the user-name of a user that is responsible for the given record. + + This is intended to be a point of contact, and is either: + + - A user directly associated with the resource, such as the original creator; or + - An active administrator for the current workspace. + + Args: + record (Record): The record for which an associated user-name is sought. + Returns: + A string containing the user-name attribute of a user considered to be responsible for the resource. + Raises: + RuntimeError if there are no active administrators for the current workspace. + """ + return self._maybe_direct_owner(record) or self._administrator_locator.get_workspace_administrator() + + @abstractmethod + def _maybe_direct_owner(self, record: Record) -> str | None: + """Obtain the record-specific user-name associated with the given record, if any.""" + return None diff --git a/src/databricks/labs/ucx/hive_metastore/grants.py b/src/databricks/labs/ucx/hive_metastore/grants.py index 8d6832627c..22b99fa992 100644 --- a/src/databricks/labs/ucx/hive_metastore/grants.py +++ b/src/databricks/labs/ucx/hive_metastore/grants.py @@ -31,6 +31,7 @@ StoragePermissionMapping, ) from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.locations import ( @@ -382,6 +383,16 @@ def grants( return [] +class GrantOwnership(Ownership[Grant]): + """Determine ownership of grants in the inventory. + + At the present we can't determine a specific owner for grants. + """ + + def _maybe_direct_owner(self, record: Grant) -> None: + return None + + class AwsACL: def __init__( self, diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 283be4f717..bd96652962 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -8,8 +8,10 @@ from databricks.sdk.errors import NotFound from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.hive_metastore.tables import Table, TableOwnership logger = logging.getLogger(__name__) @@ -151,3 +153,29 @@ def _iter_schemas(self): except NotFound: logger.warning(f"Catalog {catalog.name} no longer exists. Skipping checking its migration status.") continue + + +class TableMigrationOwnership(Ownership[TableMigrationStatus]): + """Determine ownership of table migration records in the inventory. + + This is the owner of the source table, if (and only if) the source table is present in the inventory. + """ + + def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: + super().__init__(table_ownership._administrator_locator) + self._tables_crawler = tables_crawler + self._table_ownership = table_ownership + self._indexed_tables: dict[tuple[str, str], Table] | None = None + + def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: + index = self._indexed_tables + if index is None or reindex: + snapshot = self._tables_crawler.snapshot() + index = {(table.database, table.name): table for table in snapshot} + self._indexed_tables = index + return index + + def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: + index = self._tables_snapshot_index() + source_table = index.get((record.src_schema, record.src_table), None) + return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/src/databricks/labs/ucx/hive_metastore/table_size.py b/src/databricks/labs/ucx/hive_metastore/table_size.py index 3e5c61f81c..243c4e3418 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_size.py +++ b/src/databricks/labs/ucx/hive_metastore/table_size.py @@ -4,12 +4,11 @@ from functools import partial from databricks.labs.blueprint.parallel import Threads -from databricks.labs.lsql.backends import SqlBackend from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table logger = logging.getLogger(__name__) @@ -23,20 +22,25 @@ class TableSize: class TableSizeCrawler(CrawlerBase[TableSize]): - def __init__(self, backend: SqlBackend, schema, include_databases: list[str] | None = None): + # TODO: Ensure TablesCrawler and FasterTableScanCrawler share a common interface. + def __init__(self, tables_crawler: TablesCrawler | FasterTableScanCrawler) -> None: """ Initializes a TablesSizeCrawler instance. Args: - backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) - schema: The schema name for the inventory persistence. + tables_crawler (TablesCrawler): The crawler to use to obtain the table inventory. """ # pylint: disable-next=import-error,import-outside-toplevel from pyspark.sql.session import SparkSession # type: ignore[import-not-found] - self._backend = backend - super().__init__(backend, "hive_metastore", schema, "table_size", TableSize) - self._tables_crawler = TablesCrawler(backend, schema, include_databases) + super().__init__( + tables_crawler._backend, + "hive_metastore", + tables_crawler._schema, + "table_size", + TableSize, + ) + self._tables_crawler = tables_crawler self._spark = SparkSession.builder.getOrCreate() def _crawl(self) -> Iterable[TableSize]: diff --git a/src/databricks/labs/ucx/hive_metastore/tables.py b/src/databricks/labs/ucx/hive_metastore/tables.py index 43b848b938..31643604e8 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.py +++ b/src/databricks/labs/ucx/hive_metastore/tables.py @@ -16,6 +16,7 @@ from databricks.sdk.errors import NotFound from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier logger = logging.getLogger(__name__) @@ -636,3 +637,13 @@ def _create_describe_tasks(self, catalog: str, database: str, table_names: list[ for table in table_names: tasks.append(partial(self._describe, catalog, database, table)) return tasks + + +class TableOwnership(Ownership[Table]): + """Determine ownership of tables in the inventory. + + At the present we don't determine a specific owner for tables. + """ + + def _maybe_direct_owner(self, record: Table) -> None: + return None diff --git a/src/databricks/labs/ucx/hive_metastore/udfs.py b/src/databricks/labs/ucx/hive_metastore/udfs.py index 6ee1eefd38..74196c543c 100644 --- a/src/databricks/labs/ucx/hive_metastore/udfs.py +++ b/src/databricks/labs/ucx/hive_metastore/udfs.py @@ -8,6 +8,7 @@ from databricks.sdk.errors import Unknown, NotFound from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier logger = logging.getLogger(__name__) @@ -135,3 +136,13 @@ def _assess_udfs(udfs: Iterable[Udf]) -> Iterable[Udf]: yield replace(udf, success=0, failures="Only SCALAR functions are supported") else: yield replace(udf, success=1) + + +class UdfOwnership(Ownership[Udf]): + """Determine ownership of UDFs in the inventory. + + At the present we don't determine a specific owner for UDFs. + """ + + def _maybe_direct_owner(self, record: Udf) -> None: + return None diff --git a/src/databricks/labs/ucx/source_code/directfs_access.py b/src/databricks/labs/ucx/source_code/directfs_access.py index 26acf95215..f9d02bfb7d 100644 --- a/src/databricks/labs/ucx/source_code/directfs_access.py +++ b/src/databricks/labs/ucx/source_code/directfs_access.py @@ -7,6 +7,7 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.sdk.errors import DatabricksError +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.source_code.base import DirectFsAccess @@ -52,3 +53,19 @@ def _try_fetch(self) -> Iterable[DirectFsAccess]: def _crawl(self) -> Iterable[DirectFsAccess]: return [] # TODO raise NotImplementedError() once CrawlerBase supports empty snapshots + + +class DirectFsAccessOwnership(Ownership[DirectFsAccess]): + """Determine ownership of records reporting direct filesystem access. + + This is intended to be: + + - For queries, the creator of the query (if known). + - For jobs, the owner of the path for the notebook or source (if known). + + At present this information is not gathered during the crawling process, so it can't be reported here. + """ + + def _maybe_direct_owner(self, record: DirectFsAccess) -> None: + # TODO: Implement this once the creator/ownership information is exposed during crawling. + return None diff --git a/tests/integration/assessment/test_clusters.py b/tests/integration/assessment/test_clusters.py index 01a47d1aba..8cf0622220 100644 --- a/tests/integration/assessment/test_clusters.py +++ b/tests/integration/assessment/test_clusters.py @@ -1,11 +1,17 @@ import json from datetime import timedelta +import pytest from databricks.sdk.errors import NotFound from databricks.sdk.retries import retried from databricks.sdk.service.compute import DataSecurityMode -from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler +from databricks.labs.ucx.assessment.clusters import ( + ClustersCrawler, + PoliciesCrawler, + ClusterOwnership, + ClusterPolicyOwnership, +) from .test_assessment import _SPARK_CONF @@ -39,6 +45,42 @@ def test_cluster_crawler_no_isolation(ws, make_cluster, inventory_schema, sql_ba assert results[0].failures == '["No isolation shared clusters not supported in UC"]' +def _change_cluster_owner(ws, cluster_id: str, owner_user_name: str) -> None: + """Replacement for ClustersAPI.change_owner().""" + # As of SDK 0.33.0 there is a call to wait for cluster termination that fails because it doesn't pass the cluster id + body = {'cluster_id': cluster_id, 'owner_username': owner_user_name} + headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} + ws.api_client.do('POST', '/api/2.1/clusters/change-owner', body=body, headers=headers) + + +def test_cluster_ownership(ws, runtime_ctx, make_cluster, make_user, inventory_schema, sql_backend) -> None: + """Verify the ownership can be determined for crawled clusters.""" + + # Set up two clusters: one with us as owner and one for a different user. + # TODO: Figure out how to clear the creator for a cluster. + # (Contrary to the documentation for the creator field, deleting the user doesn't clear it immediately and waiting + # for 10 min doesn't help: the UI reports no creator, but the REST API continues to report the deleted user.) + another_user = make_user() + my_cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF) + their_cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF) + ws.clusters.delete_and_wait(cluster_id=their_cluster.cluster_id) + _change_cluster_owner(ws, their_cluster.cluster_id, owner_user_name=another_user.user_name) + + # Produce the crawled records. + crawler = ClustersCrawler(ws, sql_backend, inventory_schema) + records = crawler.snapshot(force_refresh=True) + + # Find the crawled records for our clusters. + my_cluster_record = next(record for record in records if record.cluster_id == my_cluster.cluster_id) + their_cluster_record = next(record for record in records if record.cluster_id == their_cluster.cluster_id) + + # Verify ownership is as expected. + administrator_locator = runtime_ctx.administrator_locator + ownership = ClusterOwnership(administrator_locator) + assert ownership.owner_of(my_cluster_record) == ws.current_user.me().user_name + assert ownership.owner_of(their_cluster_record) == another_user.user_name + + def test_cluster_crawler_mlr_no_isolation(ws, make_cluster, inventory_schema, sql_backend): created_cluster = make_cluster( data_security_mode=DataSecurityMode.NONE, spark_version='15.4.x-cpu-ml-scala2.12', num_workers=1 @@ -86,3 +128,25 @@ def test_policy_crawler(ws, make_cluster_policy, inventory_schema, sql_backend, assert results[1].policy_name == policy_2 assert results[1].success == 0 assert results[1].failures == '["Uses azure service principal credentials config in policy."]' + + +# TODO: Investigate whether this is a bug or something wrong with this fixture. +@pytest.mark.xfail(reason="Cluster policy creators always seem to be null.") +def test_cluster_policy_ownership(ws, runtime_ctx, make_cluster_policy, inventory_schema, sql_backend) -> None: + """Verify the ownership can be determined for crawled cluster policies.""" + + # Set up a cluster policy. + # Note: there doesn't seem to be a way to change the owner of a cluster policy, so we can't test policies without + # an owner. + policy = make_cluster_policy() + + # Produce the crawled records. + crawler = PoliciesCrawler(ws, sql_backend, inventory_schema) + records = crawler.snapshot(force_refresh=True) + + # Find the crawled record for our cluster policy. + policy_record = next(record for record in records if record.policy_id == policy.policy_id) + + # Verify ownership is as expected. + ownership = ClusterPolicyOwnership(runtime_ctx.administrator_locator) + assert ownership.owner_of(policy_record) == ws.current_user.me().user_name diff --git a/tests/integration/assessment/test_jobs.py b/tests/integration/assessment/test_jobs.py index 3a8ef8dac7..47fa6f1b81 100644 --- a/tests/integration/assessment/test_jobs.py +++ b/tests/integration/assessment/test_jobs.py @@ -7,7 +7,7 @@ from databricks.sdk.service.jobs import NotebookTask, RunTask from databricks.sdk.service.workspace import ImportFormat -from databricks.labs.ucx.assessment.jobs import JobsCrawler, SubmitRunsCrawler +from databricks.labs.ucx.assessment.jobs import JobOwnership, JobsCrawler, SubmitRunsCrawler from .test_assessment import _SPARK_CONF @@ -63,3 +63,22 @@ def test_job_run_crawler(ws, env_or_skip, inventory_schema, sql_backend): failures = job_run.failures continue assert failures and failures == "[]" + + +def test_job_ownership(ws, runtime_ctx, make_job, inventory_schema, sql_backend) -> None: + """Verify the ownership can be determined for crawled jobs.""" + + # Set up a job. + # Note: there doesn't seem to be a way to change the owner of a job, so we can't test jobs without an owner. + job = make_job() + + # Produce the crawled records. + crawler = JobsCrawler(ws, sql_backend, inventory_schema) + records = crawler.snapshot(force_refresh=True) + + # Find the crawled record for our pipeline. + job_record = next(record for record in records if record.job_id == str(job.job_id)) + + # Verify ownership is as expected. + ownership = JobOwnership(runtime_ctx.administrator_locator) + assert ownership.owner_of(job_record) == ws.current_user.me().user_name diff --git a/tests/integration/assessment/test_pipelines.py b/tests/integration/assessment/test_pipelines.py index b416d83069..93f60c850f 100644 --- a/tests/integration/assessment/test_pipelines.py +++ b/tests/integration/assessment/test_pipelines.py @@ -3,7 +3,7 @@ from databricks.sdk.errors import NotFound from databricks.sdk.retries import retried -from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler +from databricks.labs.ucx.assessment.pipelines import PipelineOwnership, PipelinesCrawler from .test_assessment import _PIPELINE_CONF, _PIPELINE_CONF_WITH_SECRET, logger @@ -42,3 +42,23 @@ def test_pipeline_with_secret_conf_crawler(ws, make_pipeline, inventory_schema, assert len(results) >= 1 assert results[0].pipeline_id == created_pipeline.pipeline_id + + +def test_pipeline_ownership(ws, runtime_ctx, make_pipeline, inventory_schema, sql_backend) -> None: + """Verify the ownership can be determined for crawled pipelines.""" + + # Set up a pipeline. + # Note: there doesn't seem to be a way to change the owner of a pipeline, so we can't test pipelines without an + # owner. + pipeline = make_pipeline() + + # Produce the crawled records. + crawler = PipelinesCrawler(ws, sql_backend, inventory_schema) + records = crawler.snapshot(force_refresh=True) + + # Find the crawled record for our pipeline. + pipeline_record = next(record for record in records if record.pipeline_id == pipeline.pipeline_id) + + # Verify ownership is as expected. + ownership = PipelineOwnership(runtime_ctx.administrator_locator) + assert ownership.owner_of(pipeline_record) == ws.current_user.me().user_name diff --git a/tests/integration/framework/test_owners.py b/tests/integration/framework/test_owners.py new file mode 100644 index 0000000000..670d5817a2 --- /dev/null +++ b/tests/integration/framework/test_owners.py @@ -0,0 +1,8 @@ +from databricks.labs.ucx.contexts.workflow_task import RuntimeContext + + +def test_fallback_workspace_admin(installation_ctx: RuntimeContext) -> None: + """Verify that a workspace administrator can be found for our integration environment.""" + an_admin = installation_ctx.administrator_locator.get_workspace_administrator() + + assert "@" in an_admin diff --git a/tests/integration/hive_metastore/test_grants.py b/tests/integration/hive_metastore/test_grants.py index 6ab661264d..a89c0b94e1 100644 --- a/tests/integration/hive_metastore/test_grants.py +++ b/tests/integration/hive_metastore/test_grants.py @@ -6,6 +6,11 @@ from databricks.sdk.retries import retried from databricks.labs.lsql.backends import StatementExecutionBackend + +from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler, GrantOwnership +from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler from ..conftest import MockRuntimeContext logger = logging.getLogger(__name__) @@ -108,3 +113,25 @@ def test_all_grants_for_other_objects( assert {"DENIED_SELECT"} == found_any_file_grants[group_b.display_name] assert {"SELECT"} == found_anonymous_function_grants[group_c.display_name] assert {"DENIED_SELECT"} == found_anonymous_function_grants[group_d.display_name] + + +def test_grant_ownership(ws, runtime_ctx, inventory_schema, sql_backend) -> None: + """Verify the ownership can be determined for crawled grants.""" + # This currently isn't very useful: we can't locate specific owners for grants. + + schema = runtime_ctx.make_schema() + this_user = ws.current_user.me() + sql_backend.execute(f"GRANT SELECT ON SCHEMA {escape_sql_identifier(schema.full_name)} TO `{this_user.user_name}`") + table_crawler = TablesCrawler(sql_backend, schema=inventory_schema, include_databases=[schema.name]) + udf_crawler = UdfsCrawler(sql_backend, schema=inventory_schema, include_databases=[schema.name]) + + # Produce the crawled records. + crawler = GrantsCrawler(table_crawler, udf_crawler, include_databases=[schema.name]) + records = crawler.snapshot(force_refresh=True) + + # Find the crawled record for the grant we made. + grant_record = next(record for record in records if record.this_type_and_key() == ("DATABASE", schema.full_name)) + + # Verify ownership can be made. + ownership = GrantOwnership(runtime_ctx.administrator_locator) + assert ownership.owner_of(grant_record) == runtime_ctx.administrator_locator.get_workspace_administrator() diff --git a/tests/integration/hive_metastore/test_table_migrate.py b/tests/integration/hive_metastore/test_table_migrate.py new file mode 100644 index 0000000000..e9ba362a86 --- /dev/null +++ b/tests/integration/hive_metastore/test_table_migrate.py @@ -0,0 +1,41 @@ +import dataclasses + +from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.hive_metastore.table_migration_status import ( + TableMigrationOwnership, + TableMigrationStatus, + TableMigrationStatusRefresher, +) +from databricks.labs.ucx.hive_metastore.tables import TableOwnership + + +def test_table_migration_ownership(ws, runtime_ctx, inventory_schema, sql_backend) -> None: + """Verify the ownership can be determined for crawled table-migration records.""" + + # A table for which a migration record will be produced. + table = runtime_ctx.make_table() + + # Use the crawlers to produce the migration record. + tables_crawler = TablesCrawler(sql_backend, schema=inventory_schema, include_databases=[table.schema_name]) + table_records = tables_crawler.snapshot(force_refresh=True) + migration_status_refresher = TableMigrationStatusRefresher(ws, sql_backend, table.schema_name, tables_crawler) + migration_records = migration_status_refresher.snapshot(force_refresh=True) + + # Find the crawled records for the table we made. + table_record = next(record for record in table_records if record.full_name == table.full_name) + + def is_migration_record_for_table(record: TableMigrationStatus) -> bool: + return record.src_schema == table.schema_name and record.src_table == table.name + + table_migration_record = next(record for record in migration_records if is_migration_record_for_table(record)) + # Make a synthetic record that doesn't correspond to anything in the inventory. + synthetic_record = dataclasses.replace(table_migration_record, src_table="does_not_exist") + + # Verify for the table that the table owner and the migration status are a match. + table_ownership = TableOwnership(runtime_ctx.administrator_locator) + table_migration_ownership = TableMigrationOwnership(tables_crawler, table_ownership) + assert table_migration_ownership.owner_of(table_migration_record) == table_ownership.owner_of(table_record) + + # Verify the owner of the migration record that corresponds to an unknown table. + workspace_administrator = runtime_ctx.administrator_locator.get_workspace_administrator() + assert table_migration_ownership.owner_of(synthetic_record) == workspace_administrator diff --git a/tests/integration/hive_metastore/test_tables.py b/tests/integration/hive_metastore/test_tables.py index 2d4a372e54..efd554591a 100644 --- a/tests/integration/hive_metastore/test_tables.py +++ b/tests/integration/hive_metastore/test_tables.py @@ -5,7 +5,7 @@ from databricks.sdk.retries import retried from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.tables import What +from databricks.labs.ucx.hive_metastore.tables import What, TableOwnership logger = logging.getLogger(__name__) @@ -86,3 +86,22 @@ def test_partitioned_tables(ws, sql_backend, make_schema, make_table): assert all_tables[f"{schema.full_name}.non_partitioned_delta"].is_partitioned is False assert all_tables[f"{schema.full_name}.partitioned_parquet"].is_partitioned is True assert all_tables[f"{schema.full_name}.non_partitioned_parquet"].is_partitioned is False + + +def test_table_ownership(runtime_ctx, inventory_schema, sql_backend) -> None: + """Verify the ownership can be determined for crawled tables.""" + # This currently isn't very useful: we don't currently locate specific owners for tables. + + # A table for which we'll determine the owner. + table = runtime_ctx.make_table() + + # Produce the crawled records + crawler = TablesCrawler(sql_backend, schema=inventory_schema, include_databases=[table.schema_name]) + records = crawler.snapshot(force_refresh=True) + + # Find the crawled record for the table we made. + table_record = next(record for record in records if record.full_name == table.full_name) + + # Verify ownership can be made. + ownership = TableOwnership(runtime_ctx.administrator_locator) + assert ownership.owner_of(table_record) == runtime_ctx.administrator_locator.get_workspace_administrator() diff --git a/tests/integration/hive_metastore/test_udfs.py b/tests/integration/hive_metastore/test_udfs.py index 692d0c0675..2107267f9d 100644 --- a/tests/integration/hive_metastore/test_udfs.py +++ b/tests/integration/hive_metastore/test_udfs.py @@ -4,7 +4,7 @@ from databricks.sdk.errors import NotFound from databricks.sdk.retries import retried -from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler +from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership logger = logging.getLogger(__name__) @@ -24,3 +24,22 @@ def test_describe_all_udfs_in_databases(ws, sql_backend, inventory_schema, make_ assert len(udfs) == 3 assert sum(udf.success for udf in udfs) == 2 # hive_udf should fail assert [udf.failures for udf in udfs if udf.key == hive_udf.full_name] == ["Only SCALAR functions are supported"] + + +def test_udf_ownership(runtime_ctx, inventory_schema, sql_backend) -> None: + """Verify the ownership can be determined for crawled UDFs.""" + # This currently isn't very useful: we don't currently locate specific owners for UDFs. + + # A UDF for which we'll determine the owner. + udf = runtime_ctx.make_udf() + + # Produce the crawled records + crawler = UdfsCrawler(sql_backend, schema=inventory_schema, include_databases=[udf.schema_name]) + records = crawler.snapshot(force_refresh=True) + + # Find the crawled record for the table we made. + udf_record = next(r for r in records if f"{r.catalog}.{r.database}.{r.name}" == udf.full_name) + + # Verify ownership can be made. + ownership = UdfOwnership(runtime_ctx.administrator_locator) + assert ownership.owner_of(udf_record) == runtime_ctx.administrator_locator.get_workspace_administrator() diff --git a/tests/integration/source_code/test_directfs_access.py b/tests/integration/source_code/test_directfs_access.py new file mode 100644 index 0000000000..a462040614 --- /dev/null +++ b/tests/integration/source_code/test_directfs_access.py @@ -0,0 +1,65 @@ +import pytest + +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessOwnership +from databricks.labs.ucx.source_code.jobs import WorkflowLinter +from databricks.labs.ucx.source_code.queries import QueryLinter + + +@pytest.mark.xfail(reason="DirectFS access records don't currently include creator/owner information.") +def test_query_dfsa_ownership(runtime_ctx, make_query, make_dashboard, inventory_schema, sql_backend) -> None: + """Verify the ownership of a direct-fs record for a query.""" + + # A dashboard with a query that contains a direct filesystem reference. + query = make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`") + dashboard = make_dashboard(query=query) + + # Produce a DFSA record for the query. + linter = QueryLinter( + runtime_ctx.workspace_client, + TableMigrationIndex([]), + runtime_ctx.directfs_access_crawler_for_queries, + runtime_ctx.used_tables_crawler_for_queries, + include_dashboard_ids=[dashboard.id], + ) + linter.refresh_report(sql_backend, inventory_schema) + + # Find a record for the query. + records = runtime_ctx.directfs_access_crawler_for_queries.snapshot() + query_record = next(record for record in records if record.source_id == f"{dashboard.id}/{query.id}") + + # Verify ownership can be made. + ownership = DirectFsAccessOwnership(runtime_ctx.administrator_locator) + assert ownership.owner_of(query_record) == runtime_ctx.workspace_client.current_user.me().user_name + + +@pytest.mark.xfail(reason="DirectFS access records don't currently include creator/owner information.") +def test_path_dfsa_ownership( + runtime_ctx, make_notebook, make_job, make_directory, inventory_schema, sql_backend +) -> None: + """Verify the ownership of a direct-fs record for a notebook/source path associated with a job.""" + + # A job with a notebook task that contains direct filesystem access. + notebook_source = b"display(spark.read.csv('/mnt/things/e/f/g'))" + notebook = make_notebook(path=f"{make_directory()}/notebook.py", content=notebook_source) + job = make_job(notebook_path=notebook) + + # Produce a DFSA record for the job. + linter = WorkflowLinter( + runtime_ctx.workspace_client, + runtime_ctx.dependency_resolver, + runtime_ctx.path_lookup, + TableMigrationIndex([]), + runtime_ctx.directfs_access_crawler_for_paths, + runtime_ctx.used_tables_crawler_for_paths, + include_job_ids=[job.job_id], + ) + linter.refresh_report(sql_backend, inventory_schema) + + # Find a record for our job. + records = runtime_ctx.directfs_access_crawler_for_paths.snapshot() + path_record = next(record for record in records if record.source_id == str(notebook)) + + # Verify ownership can be made. + ownership = DirectFsAccessOwnership(runtime_ctx.administrator_locator) + assert ownership.owner_of(path_record) == runtime_ctx.workspace_client.current_user.me().user_name diff --git a/tests/unit/assessment/test_clusters.py b/tests/unit/assessment/test_clusters.py index 02956c6b75..c86c3f60f0 100644 --- a/tests/unit/assessment/test_clusters.py +++ b/tests/unit/assessment/test_clusters.py @@ -2,13 +2,21 @@ from unittest.mock import MagicMock, create_autospec, mock_open, patch import pytest -from databricks.labs.lsql import Row from databricks.labs.lsql.backends import MockBackend from databricks.sdk.errors import DatabricksError, InternalError, NotFound +from databricks.sdk.service.compute import ClusterDetails, Policy from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler -from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler +from databricks.labs.ucx.assessment.clusters import ( + ClustersCrawler, + PoliciesCrawler, + ClusterOwnership, + ClusterInfo, + ClusterPolicyOwnership, + PolicyInfo, +) from databricks.labs.ucx.framework.crawlers import SqlBackend +from databricks.labs.ucx.framework.owners import AdministratorLocator from .. import mock_workspace_client @@ -90,21 +98,27 @@ def test_cluster_init_script_check_dbfs(): def test_cluster_without_owner_should_have_empty_creator_name(): - ws = mock_workspace_client(cluster_ids=['simplest-autoscale']) - mockbackend = MockBackend() - ClustersCrawler(ws, mockbackend, "ucx").snapshot() - result = mockbackend.rows_written_for("hive_metastore.ucx.clusters", "overwrite") - assert result == [ - Row( + ws = mock_workspace_client() + ws.clusters.list.return_value = ( + ClusterDetails( + creator_user_name=None, cluster_id="simplest-autoscale", policy_id="single-user-with-spn", - cluster_name="Simplest Shared Autoscale", - creator=None, + cluster_name="Simplest Shard Autoscale", spark_version="13.3.x-cpu-ml-scala2.12", - success=1, - failures='[]', - ) - ] + ), + ClusterDetails( + creator_user_name="", + cluster_id="another-simple-autoscale", + policy_id="single-user-with-spn", + cluster_name="Another Simple Shard Autoscale", + spark_version="13.3.x-cpu-ml-scala2.12", + ), + ) + mockbackend = MockBackend() + ClustersCrawler(ws, mockbackend, "ucx").snapshot() + result = mockbackend.rows_written_for("hive_metastore.ucx.clusters", "overwrite") + assert [row["creator"] for row in result] == [None, None] def test_cluster_with_multiple_failures(): @@ -171,6 +185,27 @@ def test_unsupported_clusters(): assert result_set[0].failures == '["cluster type not supported : LEGACY_PASSTHROUGH"]' +def test_cluster_owner_creator() -> None: + admin_locator = create_autospec(AdministratorLocator) + + ownership = ClusterOwnership(admin_locator) + owner = ownership.owner_of(ClusterInfo(creator="bob", cluster_id="1", success=1, failures="[]")) + + assert owner == "bob" + admin_locator.get_workspace_administrator.assert_not_called() + + +def test_cluster_owner_creator_unknown() -> None: + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = ClusterOwnership(admin_locator) + owner = ownership.owner_of(ClusterInfo(creator=None, cluster_id="1", success=1, failures="[]")) + + assert owner == "an_admin" + admin_locator.get_workspace_administrator.assert_called_once() + + def test_policy_crawler(): ws = mock_workspace_client( policy_ids=['single-user-with-spn', 'single-user-with-spn-policyid', 'single-user-with-spn-no-sparkversion'], @@ -184,6 +219,21 @@ def test_policy_crawler(): assert "Uses azure service principal credentials config in policy." in failures +def test_policy_crawler_creator(): + ws = mock_workspace_client() + ws.cluster_policies.list.return_value = ( + Policy(policy_id="1", definition="{}", name="foo", creator_user_name=None), + Policy(policy_id="2", definition="{}", name="bar", creator_user_name=""), + Policy(policy_id="3", definition="{}", name="baz", creator_user_name="bob"), + ) + result = PoliciesCrawler(ws, MockBackend(), "ucx").snapshot(force_refresh=True) + + expected_creators = [None, None, "bob"] + crawled_creators = [record.creator for record in result] + assert len(expected_creators) == len(crawled_creators) + assert set(expected_creators) == set(crawled_creators) + + def test_policy_try_fetch(): ws = mock_workspace_client(policy_ids=['single-user-with-spn-policyid']) mock_backend = MockBackend( @@ -220,3 +270,25 @@ def test_policy_without_failure(): crawler = PoliciesCrawler(ws, MockBackend(), "ucx") result_set = list(crawler.snapshot()) assert result_set[0].failures == '[]' + + +def test_cluster_policy_owner_creator() -> None: + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = ClusterPolicyOwnership(admin_locator) + owner = ownership.owner_of(PolicyInfo(creator="bob", policy_id="1", policy_name="foo", success=1, failures="[]")) + + assert owner == "bob" + admin_locator.get_workspace_administrator.assert_not_called() + + +def test_cluster_policy_owner_creator_unknown() -> None: + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = ClusterPolicyOwnership(admin_locator) + owner = ownership.owner_of(PolicyInfo(creator=None, policy_id="1", policy_name="foo", success=1, failures="[]")) + + assert owner == "an_admin" + admin_locator.get_workspace_administrator.assert_called_once() diff --git a/tests/unit/assessment/test_jobs.py b/tests/unit/assessment/test_jobs.py index 9b7240f73a..8ec3e89077 100644 --- a/tests/unit/assessment/test_jobs.py +++ b/tests/unit/assessment/test_jobs.py @@ -1,8 +1,11 @@ +from unittest.mock import create_autospec + import pytest -from databricks.labs.lsql import Row from databricks.labs.lsql.backends import MockBackend +from databricks.sdk.service.jobs import BaseJob, JobSettings -from databricks.labs.ucx.assessment.jobs import JobsCrawler, SubmitRunsCrawler +from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership, JobsCrawler, SubmitRunsCrawler +from databricks.labs.ucx.framework.owners import AdministratorLocator from .. import mock_workspace_client @@ -59,12 +62,19 @@ def test_jobs_assessment_with_spn_cluster_no_job_tasks(): assert result_set[0].success == 1 -def test_job_crawler_with_no_owner_should_have_empty_creator_name(): - ws = mock_workspace_client(job_ids=['no-tasks']) - sql_backend = MockBackend() - JobsCrawler(ws, sql_backend, "ucx").snapshot() - result = sql_backend.rows_written_for("hive_metastore.ucx.jobs", "overwrite") - assert result == [Row(job_id='9001', success=1, failures='[]', job_name='No Tasks', creator=None)] +def test_pipeline_crawler_creator(): + ws = mock_workspace_client() + ws.jobs.list.return_value = ( + BaseJob(job_id=1, settings=JobSettings(), creator_user_name=None), + BaseJob(job_id=2, settings=JobSettings(), creator_user_name=""), + BaseJob(job_id=3, settings=JobSettings(), creator_user_name="bob"), + ) + result = JobsCrawler(ws, MockBackend(), "ucx").snapshot(force_refresh=True) + + expected_creators = [None, None, "bob"] + crawled_creators = [record.creator for record in result] + assert len(expected_creators) == len(crawled_creators) + assert set(expected_creators) == set(crawled_creators) @pytest.mark.parametrize( @@ -123,3 +133,24 @@ def test_job_run_crawler(jobruns_ids, cluster_ids, run_ids, failures): assert len(result) == 1 assert result[0].run_ids == run_ids assert result[0].failures == failures + + +def test_pipeline_owner_creator() -> None: + admin_locator = create_autospec(AdministratorLocator) + + ownership = JobOwnership(admin_locator) + owner = ownership.owner_of(JobInfo(creator="bob", job_id="1", success=1, failures="[]")) + + assert owner == "bob" + admin_locator.get_workspace_administrator.assert_not_called() + + +def test_pipeline_owner_creator_unknown() -> None: + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = JobOwnership(admin_locator) + owner = ownership.owner_of(JobInfo(creator=None, job_id="1", success=1, failures="[]")) + + assert owner == "an_admin" + admin_locator.get_workspace_administrator.assert_called_once() diff --git a/tests/unit/assessment/test_pipelines.py b/tests/unit/assessment/test_pipelines.py index b9a0acb0aa..949e441f78 100644 --- a/tests/unit/assessment/test_pipelines.py +++ b/tests/unit/assessment/test_pipelines.py @@ -1,8 +1,11 @@ -from databricks.labs.lsql import Row +from unittest.mock import create_autospec + from databricks.labs.lsql.backends import MockBackend +from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler -from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler +from databricks.labs.ucx.assessment.pipelines import PipelineOwnership, PipelineInfo, PipelinesCrawler +from databricks.labs.ucx.framework.owners import AdministratorLocator from .. import mock_workspace_client @@ -44,19 +47,38 @@ def test_pipeline_list_with_no_config(): assert len(crawler) == 0 -def test_pipeline_without_owners_should_have_empty_creator_name(): - ws = mock_workspace_client(pipeline_ids=['empty-spec']) - ws.dbfs.read().data = "JXNoCmVjaG8gIj0=" - mockbackend = MockBackend() - PipelinesCrawler(ws, mockbackend, "ucx").snapshot() - result = mockbackend.rows_written_for("hive_metastore.ucx.pipelines", "overwrite") - - assert result == [ - Row( - pipeline_id="empty-spec", - pipeline_name="New DLT Pipeline", - creator_name=None, - success=1, - failures="[]", - ) - ] +def test_pipeline_crawler_creator(): + ws = mock_workspace_client() + ws.pipelines.list_pipelines.return_value = ( + PipelineStateInfo(pipeline_id="1", creator_user_name=None), + PipelineStateInfo(pipeline_id="2", creator_user_name=""), + PipelineStateInfo(pipeline_id="3", creator_user_name="bob"), + ) + ws.pipelines.get = create_autospec(GetPipelineResponse) # pylint: disable=mock-no-usage + result = PipelinesCrawler(ws, MockBackend(), "ucx").snapshot(force_refresh=True) + + expected_creators = [None, None, "bob"] + crawled_creators = [record.creator_name for record in result] + assert len(expected_creators) == len(crawled_creators) + assert set(expected_creators) == set(crawled_creators) + + +def test_pipeline_owner_creator() -> None: + admin_locator = create_autospec(AdministratorLocator) + + ownership = PipelineOwnership(admin_locator) + owner = ownership.owner_of(PipelineInfo(creator_name="bob", pipeline_id="1", success=1, failures="[]")) + + assert owner == "bob" + admin_locator.get_workspace_administrator.assert_not_called() + + +def test_pipeline_owner_creator_unknown() -> None: + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = PipelineOwnership(admin_locator) + owner = ownership.owner_of(PipelineInfo(creator_name=None, pipeline_id="1", success=1, failures="[]")) + + assert owner == "an_admin" + admin_locator.get_workspace_administrator.assert_called_once() diff --git a/tests/unit/framework/test_owners.py b/tests/unit/framework/test_owners.py new file mode 100644 index 0000000000..25dd465b6f --- /dev/null +++ b/tests/unit/framework/test_owners.py @@ -0,0 +1,345 @@ +import re +from collections.abc import Callable, Sequence +from unittest.mock import create_autospec, Mock + +import pytest +from databricks.sdk.errors import NotFound +from databricks.sdk.service import iam + +from databricks.labs.ucx.framework.owners import ( + AccountAdministratorFinder, + AdministratorFinder, + AdministratorLocator, + Ownership, + Record, + WorkspaceAdministratorFinder, +) + + +class _OwnershipFixture(Ownership[Record]): + def __init__( + self, + *, + owner_fn: Callable[[Record], str | None] = lambda _: None, + ): + mock_admin_locator = create_autospec(AdministratorLocator) # pylint: disable=mock-no-usage + super().__init__(mock_admin_locator) + self._owner_fn = owner_fn + self.mock_admin_locator = mock_admin_locator + + def _maybe_direct_owner(self, record: Record) -> str | None: + return self._owner_fn(record) + + +def _setup_accounts( + ws, + *, + account_users: Sequence[iam.User] = (), + workspace_users: Sequence[iam.User] = (), + groups: Sequence[iam.Group] = (), +) -> None: + # Stub for the workspace users. + ws.users.list.return_value = list(workspace_users) + + # Stub for the groups. + groups_by_id = {group.id: group for group in groups} + + def stub_groups_get(group_id: str) -> iam.Group: + try: + return groups_by_id[group_id] + except KeyError as e: + msg = f"Group not found: {group_id}" + raise NotFound(msg) from e + + ws.groups.get.side_effect = stub_groups_get + ws.groups.list.return_value = groups + + # Stub for the account users. + def stub_rest_call(method: str, path: str | None = None, query: dict | None = None) -> dict: + if method == "GET" and path == "/api/2.0/account/scim/v2/Users" and query: + return {"Resources": [user.as_dict() for user in account_users]} + msg = f"Call not mocked: {method} {path}" + raise NotImplementedError(msg) + + ws.api_client.do.side_effect = stub_rest_call + + +def _create_workspace_admin(user_name: str, admins_group_id: str) -> iam.User: + return iam.User( + user_name=user_name, + active=True, + groups=[iam.ComplexValue(display="admins", ref=f"Groups/{admins_group_id}", value=admins_group_id)], + ) + + +def _create_account_admin(user_name: str) -> iam.User: + return iam.User(user_name=user_name, active=True, roles=[iam.ComplexValue(value="account_admin")]) + + +def _create_workspace_group(display_name: str, group_id: str) -> iam.Group: + return iam.Group(display_name=display_name, id=group_id, meta=iam.ResourceMeta(resource_type="WorkspaceGroup")) + + +def test_workspace_admin_finder_active_with_username(ws) -> None: + """Verify that the workspace admin finder only reports active users with a user-name.""" + admins_group = _create_workspace_group("admins", group_id="1") + inactive_admin = _create_workspace_admin("inactive_admin_1", admins_group_id="1") + inactive_admin.active = False + users = [ + _create_workspace_admin("only_real_admin", admins_group_id="1"), + inactive_admin, + _create_workspace_admin("", admins_group_id="1"), + ] + _setup_accounts(ws, workspace_users=users, groups=[admins_group]) + + finder = WorkspaceAdministratorFinder(ws) + admins = list(finder.find_admin_users()) + + assert [admin.user_name for admin in admins] == ["only_real_admin"] + + +def test_workspace_admin_finder_admins_members(ws) -> None: + """Verify that the workspace admin finder only reports members of the 'admins' workspace group.""" + groups = [ + _create_workspace_group("admins", group_id="1"), + _create_workspace_group("users", group_id="2"), + _create_workspace_group("not_admins", group_id="3"), + iam.Group(display_name="admins", id="4", meta=iam.ResourceMeta(resource_type="Group")), + ] + users = [ + _create_workspace_admin("admin_1", admins_group_id="1"), + iam.User( + user_name="admin_2", + active=True, + groups=[ + iam.ComplexValue(display="admins", ref="Groups/1", value="1"), + iam.ComplexValue(display="users", ref="Groups/2", value="2"), + ], + ), + iam.User( + user_name="not_admin_1", + active=True, + groups=[ + iam.ComplexValue(display="users", ref="Groups/2", value="2"), + iam.ComplexValue(display="not_admins", ref="Groups/3", value="3"), + ], + ), + iam.User( + user_name="not_admin_2", + active=True, + groups=[ + iam.ComplexValue(display="admins", ref="Groups/4", value="4"), + ], + ), + ] + _setup_accounts(ws, workspace_users=users, groups=groups) + + finder = WorkspaceAdministratorFinder(ws) + admins = list(finder.find_admin_users()) + + expected_admins = {"admin_1", "admin_2"} + assert len(admins) == len(expected_admins) + assert set(admin.user_name for admin in admins) == expected_admins + + +def test_workspace_admin_finder_no_admins(ws) -> None: + """Verify that the workspace admin finder handles no admins as a normal situation.""" + admins_group = _create_workspace_group("admins", group_id="1") + _setup_accounts(ws, workspace_users=[], groups=[admins_group]) + + finder = WorkspaceAdministratorFinder(ws) + admins = list(finder.find_admin_users()) + + assert not admins + + +def testa_accounts_admin_finder_active_with_username(ws) -> None: + """Verify that the account admin finder only reports active users with a user-name.""" + inactive_admin = _create_account_admin("inactive_admin") + inactive_admin.active = False + users = [ + _create_account_admin("only_real_admin"), + inactive_admin, + _create_account_admin(""), + ] + _setup_accounts(ws, account_users=users) + + finder = AccountAdministratorFinder(ws) + admins = list(finder.find_admin_users()) + + assert [admin.user_name for admin in admins] == ["only_real_admin"] + + +def test_accounts_admin_finder_role(ws) -> None: + """Verify that the account admin finder only reports users with the 'account_admin' role.""" + users = [ + _create_account_admin("admin_1"), + iam.User( + user_name="admin_2", + active=True, + roles=[ + iam.ComplexValue(value="account_admin"), + iam.ComplexValue(value="another_role"), + ], + ), + iam.User( + user_name="not_admin", + active=True, + roles=[ + iam.ComplexValue(value="another_role"), + ], + ), + ] + _setup_accounts(ws, account_users=users) + + finder = AccountAdministratorFinder(ws) + admins = list(finder.find_admin_users()) + + expected_admins = {"admin_1", "admin_2"} + assert len(admins) == len(expected_admins) + assert set(admin.user_name for admin in admins) == expected_admins + + +def test_accounts_admin_finder_no_admins(ws) -> None: + """Verify that the workspace admin finder handles no admins as a normal situation.""" + finder = AccountAdministratorFinder(ws) + admins = list(finder.find_admin_users()) + + assert not admins + + +def test_admin_locator_prefers_workspace_admin_over_account_admin(ws) -> None: + """Verify that when both workspace and account administrators are configured, the workspace admin is preferred.""" + admins_group = _create_workspace_group("admins", group_id="1") + assert admins_group.id + workspace_users = [_create_workspace_admin("bob", admins_group_id=admins_group.id)] + account_users = [_create_account_admin("jane")] + _setup_accounts(ws, account_users=account_users, workspace_users=workspace_users, groups=[admins_group]) + + locator = AdministratorLocator(ws) + the_admin = locator.get_workspace_administrator() + + assert the_admin == "bob" + # Also verify that we didn't attempt to look up account admins. + ws.api_client.do.assert_not_called() + + +def test_admin_locator_prefer_first_workspace_admin_alphabetically(ws) -> None: + """Verify that when multiple workspace administrators can found, the first alphabetically is used.""" + admins_group = _create_workspace_group("admins", group_id="1") + assert admins_group.id + workspace_users = [ + _create_workspace_admin("bob", admins_group_id=admins_group.id), + _create_workspace_admin("andrew", admins_group_id=admins_group.id), + _create_workspace_admin("jane", admins_group_id=admins_group.id), + ] + _setup_accounts(ws, workspace_users=workspace_users, groups=[admins_group]) + + locator = AdministratorLocator(ws) + the_admin = locator.get_workspace_administrator() + + assert the_admin == "andrew" + + +def test_admin_locator_prefer_first_account_admin_alphabetically(ws) -> None: + """Verify that when multiple account administrators can found, the first alphabetically preferred is used.""" + account_users = [ + _create_account_admin("bob"), + _create_account_admin("andrew"), + _create_account_admin("jane"), + ] + _setup_accounts(ws, account_users=account_users) + + locator = AdministratorLocator(ws) + the_admin = locator.get_workspace_administrator() + + assert the_admin == "andrew" + + +def test_admin_locator_error_when_no_admin(ws) -> None: + """Verify that an error is raised when no workspace or account administrators can be found.""" + _setup_accounts(ws) + + locator = AdministratorLocator(ws) + # No admins. + workspace_id = ws.get_workspace_id() + expected_message = f"No active workspace or account administrator can be found for workspace: {workspace_id}" + with pytest.raises(RuntimeError, match=re.escape(expected_message)): + _ = locator.get_workspace_administrator() + + +def test_admin_locator_is_lazy(ws) -> None: + """Verify that we don't attempt to locate an administrator until it's needed.""" + mock_finder = create_autospec(AdministratorFinder) + mock_finder.find_admin_users.return_value = (_create_account_admin("bob"),) + mock_finder_factory = Mock() + mock_finder_factory.return_value = mock_finder + locator = AdministratorLocator(ws, finders=[mock_finder_factory]) + + mock_finder_factory.assert_not_called() + mock_finder.assert_not_called() + + _ = locator.get_workspace_administrator() + + mock_finder_factory.assert_called_once_with(ws) + mock_finder.find_admin_users.assert_called_once() + + +def test_admin_locator_caches_result(ws) -> None: + """Verify that locating an administrator only happens once.""" + mock_finder = create_autospec(AdministratorFinder) + mock_finder.find_admin_users.return_value = (_create_account_admin("bob"),) + mock_finder_factory = Mock() + mock_finder_factory.return_value = mock_finder + + locator = AdministratorLocator(ws, finders=[mock_finder_factory]) + _ = locator.get_workspace_administrator() + _ = locator.get_workspace_administrator() + + mock_finder_factory.assert_called_once_with(ws) + mock_finder.find_admin_users.assert_called_once() + + +def test_admin_locator_caches_negative_result(ws) -> None: + """Verify that locating an administrator only happens once, even if it couldn't locate an admin.""" + mock_finder = create_autospec(AdministratorFinder) + mock_finder.find_admin_users.return_value = () + mock_finder_factory = Mock() + mock_finder_factory.return_value = mock_finder + + locator = AdministratorLocator(ws, finders=[mock_finder_factory]) + with pytest.raises(RuntimeError): + _ = locator.get_workspace_administrator() + with pytest.raises(RuntimeError): + _ = locator.get_workspace_administrator() + + mock_finder_factory.assert_called_once_with(ws) + mock_finder.find_admin_users.assert_called_once() + + +def test_ownership_prefers_record_owner() -> None: + """Verify that if an owner for the record can be found, that is used.""" + ownership = _OwnershipFixture[str](owner_fn=lambda _: "bob") + owner = ownership.owner_of("school") + + assert owner == "bob" + ownership.mock_admin_locator.get_workspace_administrator.assert_not_called() + + +def test_ownership_admin_user_fallback() -> None: + """Verify that if no owner for the record can be found, an admin user is returned instead.""" + ownership = _OwnershipFixture[str]() + ownership.mock_admin_locator.get_workspace_administrator.return_value = "jane" + + owner = ownership.owner_of("school") + + assert owner == "jane" + + +def test_ownership_no_fallback_admin_user_error() -> None: + """Verify that if no owner can be determined, an error is raised.""" + ownership = _OwnershipFixture[str]() + ownership.mock_admin_locator.get_workspace_administrator.side_effect = RuntimeError("Mocked admin lookup failure.") + + with pytest.raises(RuntimeError, match="Mocked admin lookup failure."): + _ = ownership.owner_of("school") diff --git a/tests/unit/hive_metastore/test_grants.py b/tests/unit/hive_metastore/test_grants.py index 101f1dd602..7f31824e02 100644 --- a/tests/unit/hive_metastore/test_grants.py +++ b/tests/unit/hive_metastore/test_grants.py @@ -4,7 +4,8 @@ import pytest from databricks.labs.lsql.backends import MockBackend -from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler, MigrateGrants +from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler, MigrateGrants, GrantOwnership from databricks.labs.ucx.hive_metastore.tables import Table, TablesCrawler from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler from databricks.labs.ucx.workspace_access.groups import GroupManager @@ -527,3 +528,15 @@ def grant_loader() -> list[Grant]: in caplog.text ) group_manager.assert_not_called() + + +def test_grant_owner() -> None: + """Verify that the owner of a crawled grant is an administrator.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = GrantOwnership(admin_locator) + owner = ownership.owner_of(Grant(principal="someone", action_type="SELECT")) + + assert owner == "an_admin" + admin_locator.get_workspace_administrator.assert_called_once() diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index e713b6fa82..b9378875dc 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -2,12 +2,14 @@ import logging from itertools import cycle from unittest.mock import create_autospec + import pytest from databricks.labs.lsql.backends import MockBackend, SqlBackend from databricks.sdk import WorkspaceClient from databricks.sdk.errors import NotFound from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo, TableInfo +from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.hive_metastore import Mounts from databricks.labs.ucx.hive_metastore.grants import MigrateGrants from databricks.labs.ucx.hive_metastore.locations import Mount @@ -22,11 +24,13 @@ from databricks.labs.ucx.hive_metastore.table_migration_status import ( TableMigrationStatusRefresher, TableMigrationIndex, + TableMigrationOwnership, TableMigrationStatus, TableView, ) from databricks.labs.ucx.hive_metastore.tables import ( Table, + TableOwnership, TablesCrawler, What, ) @@ -1234,3 +1238,103 @@ def test_refresh_migration_status_published_remained_tables(caplog): assert 'remained-hive-metastore-table: hive_metastore.schema1.table3' in caplog.messages assert len(tables) == 1 and tables[0].key == "hive_metastore.schema1.table3" migrate_grants.assert_not_called() + + +def test_table_migration_status_owner() -> None: + admin_locator = create_autospec(AdministratorLocator) + + tables_crawler = create_autospec(TablesCrawler) + the_table = Table( + catalog="hive_metastore", + database="foo", + name="bar", + object_type="TABLE", + table_format="DELTA", + location="/some/path", + ) + tables_crawler.snapshot.return_value = [the_table] + table_ownership = create_autospec(TableOwnership) + table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access + table_ownership.owner_of.return_value = "bob" + + ownership = TableMigrationOwnership(tables_crawler, table_ownership) + owner = ownership.owner_of( + TableMigrationStatus( + src_schema="foo", + src_table="bar", + dst_catalog="main", + dst_schema="foo", + dst_table="bar", + ) + ) + + assert owner == "bob" + tables_crawler.snapshot.assert_called_once() + table_ownership.owner_of.assert_called_once_with(the_table) + admin_locator.get_workspace_administrator.assert_not_called() + + +def test_table_migration_status_owner_caches_tables_snapshot() -> None: + """Verify that the tables inventory isn't loaded until needed, and after that isn't loaded repeatedly.""" + admin_locator = create_autospec(AdministratorLocator) # pylint: disable=mock-no-usage + + tables_crawler = create_autospec(TablesCrawler) + a_table = Table( + catalog="hive_metastore", + database="foo", + name="bar", + object_type="TABLE", + table_format="DELTA", + location="/some/path", + ) + b_table = Table( + catalog="hive_metastore", + database="baz", + name="daz", + object_type="TABLE", + table_format="DELTA", + location="/some/path", + ) + tables_crawler.snapshot.return_value = [a_table, b_table] + table_ownership = create_autospec(TableOwnership) + table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access + table_ownership.owner_of.return_value = "bob" + + ownership = TableMigrationOwnership(tables_crawler, table_ownership) + + # Verify the snapshot() hasn't been loaded yet: it isn't needed. + tables_crawler.snapshot.assert_not_called() + + _ = ownership.owner_of( + TableMigrationStatus(src_schema="foo", src_table="bar", dst_catalog="main", dst_schema="foo", dst_table="bar"), + ) + _ = ownership.owner_of( + TableMigrationStatus(src_schema="baz", src_table="daz", dst_catalog="main", dst_schema="foo", dst_table="bar"), + ) + + # Verify the snapshot() wasn't reloaded for the second .owner_of() call. + tables_crawler.snapshot.assert_called_once() + + +def test_table_migration_status_source_table_unknown() -> None: + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + tables_crawler = create_autospec(TablesCrawler) + tables_crawler.snapshot.return_value = [] + table_ownership = create_autospec(TableOwnership) + table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access + + ownership = TableMigrationOwnership(tables_crawler, table_ownership) + + unknown_table = TableMigrationStatus( + src_schema="foo", + src_table="bar", + dst_catalog="main", + dst_schema="foo", + dst_table="bar", + ) + owner = ownership.owner_of(unknown_table) + + assert owner == "an_admin" + table_ownership.owner_of.assert_not_called() diff --git a/tests/unit/hive_metastore/test_table_size.py b/tests/unit/hive_metastore/test_table_size.py index c9554d3e34..699a7019cf 100644 --- a/tests/unit/hive_metastore/test_table_size.py +++ b/tests/unit/hive_metastore/test_table_size.py @@ -3,6 +3,7 @@ from databricks.labs.lsql.backends import MockBackend +from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.table_size import TableSize, TableSizeCrawler # pylint: disable=protected-access @@ -31,7 +32,7 @@ def test_table_size_crawler(mocker) -> None: backend = MockBackend(rows=rows) pyspark_sql_session = mocker.Mock() sys.modules["pyspark.sql.session"] = pyspark_sql_session - tsc = TableSizeCrawler(backend, "inventory_database") + tsc = TableSizeCrawler(TablesCrawler(backend, "inventory_database")) tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.return_value = 100 results = list(tsc.snapshot()) @@ -55,7 +56,7 @@ def test_table_size_unknown_error(mocker, caplog): backend = MockBackend(fails_on_first=errors, rows=rows) pyspark_sql_session = mocker.Mock() sys.modules["pyspark.sql.session"] = pyspark_sql_session - tsc = TableSizeCrawler(backend, "inventory_database") + tsc = TableSizeCrawler(TablesCrawler(backend, "inventory_database")) tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.side_effect = Exception(...) with caplog.at_level(logging.WARNING): @@ -76,7 +77,7 @@ def test_table_size_table_or_view_not_found(mocker, caplog): backend = MockBackend(fails_on_first=errors, rows=rows) pyspark_sql_session = mocker.Mock() sys.modules["pyspark.sql.session"] = pyspark_sql_session - tsc = TableSizeCrawler(backend, "inventory_database") + tsc = TableSizeCrawler(TablesCrawler(backend, "inventory_database")) # table removed after crawling tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.side_effect = Exception( @@ -102,7 +103,7 @@ def test_table_size_delta_table_not_found(mocker, caplog): backend = MockBackend(fails_on_first=errors, rows=rows) pyspark_sql_session = mocker.Mock() sys.modules["pyspark.sql.session"] = pyspark_sql_session - tsc = TableSizeCrawler(backend, "inventory_database") + tsc = TableSizeCrawler(TablesCrawler(backend, "inventory_database")) # table removed after crawling tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.side_effect = Exception( @@ -128,7 +129,7 @@ def test_table_size_when_table_corrupted(mocker, caplog): backend = MockBackend(fails_on_first=errors, rows=rows) pyspark_sql_session = mocker.Mock() sys.modules["pyspark.sql.session"] = pyspark_sql_session - tsc = TableSizeCrawler(backend, "inventory_database") + tsc = TableSizeCrawler(TablesCrawler(backend, "inventory_database")) tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.side_effect = Exception( "[DELTA_MISSING_TRANSACTION_LOG]" @@ -153,7 +154,7 @@ def test_table_size_when_delta_invalid_format_error(mocker, caplog): backend = MockBackend(fails_on_first=errors, rows=rows) pyspark_sql_session = mocker.Mock() sys.modules["pyspark.sql.session"] = pyspark_sql_session - tsc = TableSizeCrawler(backend, "inventory_database") + tsc = TableSizeCrawler(TablesCrawler(backend, "inventory_database")) tsc._spark._jsparkSession.table().queryExecution().analyzed().stats().sizeInBytes.side_effect = Exception( "[DELTA_INVALID_FORMAT]" diff --git a/tests/unit/hive_metastore/test_tables.py b/tests/unit/hive_metastore/test_tables.py index 06dec56bee..440bdcc597 100644 --- a/tests/unit/hive_metastore/test_tables.py +++ b/tests/unit/hive_metastore/test_tables.py @@ -1,11 +1,20 @@ import logging import sys +from unittest.mock import create_autospec import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.hive_metastore.locations import Mount, ExternalLocations -from databricks.labs.ucx.hive_metastore.tables import Table, TablesCrawler, What, HiveSerdeType, FasterTableScanCrawler +from databricks.labs.ucx.hive_metastore.tables import ( + FasterTableScanCrawler, + HiveSerdeType, + Table, + TableOwnership, + TablesCrawler, + What, +) def test_is_delta_true(): @@ -326,10 +335,7 @@ def test_is_partitioned_flag(): ], } backend = MockBackend(rows=rows) - tables_crawler = TablesCrawler( - backend, - "default", - ) + tables_crawler = TablesCrawler(backend, "default") results = tables_crawler.snapshot() assert len(results) == 2 assert ( @@ -653,3 +659,16 @@ def test_fast_table_scan_crawler_crawl_test_warnings_get_table(caplog, mocker, s with caplog.at_level(logging.WARNING): ftsc.snapshot() assert "Test getTable warning" in caplog.text + + +def test_table_owner() -> None: + """Verify that the owner of a crawled table is an administrator.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = TableOwnership(admin_locator) + table = Table(catalog="main", database="foo", name="bar", object_type="TABLE", table_format="DELTA") + owner = ownership.owner_of(table) + + assert owner == "an_admin" + admin_locator.get_workspace_administrator.assert_called_once() diff --git a/tests/unit/hive_metastore/test_udfs.py b/tests/unit/hive_metastore/test_udfs.py index b3ba27a63e..d1c87d66ad 100644 --- a/tests/unit/hive_metastore/test_udfs.py +++ b/tests/unit/hive_metastore/test_udfs.py @@ -1,6 +1,9 @@ +from unittest.mock import create_autospec + from databricks.labs.lsql.backends import MockBackend -from databricks.labs.ucx.hive_metastore.udfs import Udf, UdfsCrawler +from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.hive_metastore.udfs import Udf, UdfsCrawler, UdfOwnership def test_key(): @@ -43,3 +46,27 @@ def test_tables_crawler_should_filter_by_database(): udf_crawler = UdfsCrawler(backend, "default", ["database"]) results = udf_crawler.snapshot() assert len(results) == 1 + + +def test_udf_owner() -> None: + """Verify that the owner of a crawled UDF is an administrator.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = UdfOwnership(admin_locator) + udf = Udf( + catalog="main", + database="foo", + name="bar", + func_type="UNKNOWN", + func_input="UNKNOWN", + func_returns="UNKNOWN", + deterministic=True, + data_access="UNKNOWN", + body="UNKNOWN", + comment="UNKNOWN", + ) + owner = ownership.owner_of(udf) + + assert owner == "an_admin" + admin_locator.get_workspace_administrator.assert_called_once() diff --git a/tests/unit/source_code/test_directfs_access.py b/tests/unit/source_code/test_directfs_access.py index 0c1063b820..c02ad07315 100644 --- a/tests/unit/source_code/test_directfs_access.py +++ b/tests/unit/source_code/test_directfs_access.py @@ -1,11 +1,14 @@ from datetime import datetime +from unittest.mock import create_autospec from databricks.labs.lsql.backends import MockBackend +from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.source_code.base import LineageAtom from databricks.labs.ucx.source_code.directfs_access import ( DirectFsAccessCrawler, DirectFsAccess, + DirectFsAccessOwnership, ) @@ -30,3 +33,16 @@ def test_crawler_appends_dfsas(): crawler.dump_all(dfsas) rows = backend.rows_written_for(crawler.full_name, "append") assert len(rows) == 3 + + +def test_directfs_access_ownership() -> None: + """Verify that the owner for a direct-fs access record is an administrator.""" + admin_locator = create_autospec(AdministratorLocator) + admin_locator.get_workspace_administrator.return_value = "an_admin" + + ownership = DirectFsAccessOwnership(admin_locator) + dfsa = DirectFsAccess() + owner = ownership.owner_of(dfsa) + + assert owner == "an_admin" + admin_locator.get_workspace_administrator.assert_called_once() diff --git a/tests/unit/workspace_access/test_manager.py b/tests/unit/workspace_access/test_manager.py index 8a1d7d85cc..327defda1a 100644 --- a/tests/unit/workspace_access/test_manager.py +++ b/tests/unit/workspace_access/test_manager.py @@ -4,7 +4,6 @@ import pytest from databricks.labs.lsql import Row from databricks.labs.lsql.backends import MockBackend -from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError from databricks.sdk.service import iam @@ -286,8 +285,7 @@ def test_manager_verify_no_tasks(): assert result -def test_manager_apply_experimental_no_tasks(caplog): - ws = create_autospec(WorkspaceClient) +def test_manager_apply_experimental_no_tasks(ws, caplog): group_migration_state = MigrationState([]) with caplog.at_level("INFO"):