Skip to content

Crawler: support for object ownership #2774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 66 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
573fca2
Crawler support for object ownership.
asnare Oct 1, 2024
b8f7e69
Move the ownership code into its own module, and stub unit tests.
asnare Oct 3, 2024
07fa875
Skip users that don't have a user-name.
asnare Oct 3, 2024
28daa7e
Sort by the user-name attribute, not name.
asnare Oct 3, 2024
f4e247e
Materialize list earlier, to aid debugging.
asnare Oct 3, 2024
d0c22db
Documentation references for how administrators are marked for worksp…
asnare Oct 3, 2024
467f912
Ensure that unit tests reset the (class-level) cache before they start.
asnare Oct 3, 2024
33cb841
Fix mock workspace identifier to have the correct type.
asnare Oct 3, 2024
3a1868c
Trivial integration test for locating an administrator.
asnare Oct 3, 2024
ec23bb0
Start implementing unit tests for the Ownership component.
asnare Oct 3, 2024
b9dd2a3
Refactor fixture code for mocking accounts and groups.
asnare Oct 3, 2024
a6b46c1
Merge branch 'main' into inventory-object-owners
asnare Oct 3, 2024
57bf8c3
Revert plumbing the workspace client into CrawlerBase
asnare Oct 3, 2024
7db7aa0
More reverting.
asnare Oct 3, 2024
7676f7c
Whitespace.
asnare Oct 3, 2024
7010237
Implement more unit tests.
asnare Oct 3, 2024
d1e24eb
Refactor workspace/account admin lookup into separate components.
asnare Oct 3, 2024
9155e19
Update integration test for locating a workspace admin to test the lo…
asnare Oct 4, 2024
9980c20
Refactor unit tests for the ownership-related classes.
asnare Oct 4, 2024
fded489
Merge branch 'main' into inventory-object-owners
asnare Oct 4, 2024
53da23d
Deal with some comprehension issues.
asnare Oct 4, 2024
83044e8
Implement ownership for the ClusterInfo inventory class.
asnare Oct 4, 2024
ed38942
Docstring for cluster ownership.
asnare Oct 4, 2024
5d4c994
Check some mock interactions.
asnare Oct 4, 2024
348d9b0
Improve docstring clarity.
asnare Oct 4, 2024
5cf6f30
Fix unit test.
asnare Oct 4, 2024
8d4de1f
Suppress pylint false positive.
asnare Oct 4, 2024
4a597a2
Implement ownership for cluster policies.
asnare Oct 4, 2024
1818d46
Fix linting suppression.
asnare Oct 4, 2024
c13b4eb
Use runtime context for integration tests instead of installation con…
asnare Oct 4, 2024
7e66e70
Fix xfail marker for integration test.
asnare Oct 4, 2024
f7942aa
Implement ownership for grants.
asnare Oct 4, 2024
3cb9abf
Use a longer variable name.
asnare Oct 4, 2024
2120322
Whitespace.
asnare Oct 4, 2024
e2189f4
Ownership implementation for tables.
asnare Oct 4, 2024
0d8e48b
Ownership implementation of UDFs.
asnare Oct 4, 2024
28ab56a
Ensure fewer unnecessary mock interactions.
asnare Oct 4, 2024
cc7db1c
Ownership implementation for pipelines.
asnare Oct 4, 2024
0dd3f11
Merge branch 'main' into inventory-object-owners
asnare Oct 4, 2024
64048e1
Merge branch 'main' into inventory-object-owners
asnare Oct 7, 2024
8f0265f
Ownership implementation for Jobs.
asnare Oct 7, 2024
8b944b3
Remove the workspace client from the ownership initializer.
asnare Oct 7, 2024
86582ad
Ownership implementation for the table migration status records.
asnare Oct 7, 2024
b2e66f2
Integration test for table migration ownership.
asnare Oct 7, 2024
953ff62
Stubbed ownership implementation for direct filesystem access records.
asnare Oct 7, 2024
7037b6a
Remove unintentional comment.
asnare Oct 7, 2024
8282051
Type hint.
asnare Oct 7, 2024
3d769c6
Rename: admin_groups -> admin_group_ids
asnare Oct 7, 2024
3c0a5b4
Fix failing integration test.
asnare Oct 7, 2024
9b39e30
Revert a change from this PR.
asnare Oct 7, 2024
7029bf9
Merge branch 'main' into inventory-object-owners
asnare Oct 8, 2024
8d8191d
Simplify creator normalisation.
asnare Oct 8, 2024
94a601d
Rename method: _get_owner() -> _maybe_direct_owner()
asnare Oct 8, 2024
b627890
Simplify the code a bit for locating members of the 'admins' workspac…
asnare Oct 8, 2024
ae8d194
Replace a property (with expensive side-effects) with a getter method.
asnare Oct 8, 2024
a6b5da0
Avoid exposing the admin-finder on the ownership interface.
asnare Oct 8, 2024
33d9c13
Refactor a sequence of generator comprehensions into a for-loop for r…
asnare Oct 8, 2024
d677179
Merge branch 'main' into inventory-object-owners
asnare Oct 8, 2024
b68a6c4
Merge branch 'main' into inventory-object-owners
asnare Oct 9, 2024
c6de109
Remove documentation that the ownership classes report an admin user …
asnare Oct 9, 2024
4deaf93
Docstring improvements.
asnare Oct 9, 2024
47d5343
Fix incorrect term: result -> record
asnare Oct 9, 2024
d74c241
Remove property.
asnare Oct 9, 2024
736ecfb
Merge branch 'main' into inventory-object-owners
asnare Oct 9, 2024
409bdf9
Update some ownership integration tests to verify the complete admin …
asnare Oct 9, 2024
215a1be
Update integration test for cluster ownership.
asnare Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/databricks/labs/ucx/assessment/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ class ServicePrincipalClusterMapping:

class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo], JobsMixin, SecretsMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo)

def _try_fetch(self) -> Iterable[AzureServicePrincipalInfo]:
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
Expand Down
6 changes: 2 additions & 4 deletions src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ def _check_cluster_failures(self, cluster: ClusterDetails, source: str) -> list[

class ClustersCrawler(CrawlerBase[ClusterInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str):
super().__init__(sbe, "hive_metastore", schema, "clusters", ClusterInfo)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "clusters", ClusterInfo)

def _crawl(self) -> Iterable[ClusterInfo]:
all_clusters = list(self._ws.clusters.list())
Expand Down Expand Up @@ -192,8 +191,7 @@ class PolicyInfo:

class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "policies", PolicyInfo)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "policies", PolicyInfo)

def _crawl(self) -> Iterable[PolicyInfo]:
all_policices = list(self._ws.cluster_policies.list())
Expand Down
3 changes: 1 addition & 2 deletions src/databricks/labs/ucx/assessment/init_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def check_init_script(self, init_script_data: str | None, source: str) -> list[s

class GlobalInitScriptCrawler(CrawlerBase[GlobalInitScriptInfo], CheckInitScriptMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo)

def _crawl(self) -> Iterable[GlobalInitScriptInfo]:
all_global_init_scripts = list(self._ws.global_init_scripts.list())
Expand Down
6 changes: 2 additions & 4 deletions src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ def _job_clusters(job):

class JobsCrawler(CrawlerBase[JobInfo], JobsMixin, CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "jobs", JobInfo)

def _crawl(self) -> Iterable[JobInfo]:
all_jobs = list(self._ws.jobs.list(expand_tasks=True))
Expand Down Expand Up @@ -159,8 +158,7 @@ class SubmitRunsCrawler(CrawlerBase[SubmitRunInfo], JobsMixin, CheckClusterMixin
]

def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str, num_days_history: int):
super().__init__(sbe, "hive_metastore", schema, "submit_runs", SubmitRunInfo)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "submit_runs", SubmitRunInfo)
self._num_days_history = num_days_history

@staticmethod
Expand Down
3 changes: 1 addition & 2 deletions src/databricks/labs/ucx/assessment/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ class PipelineInfo:

class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "pipelines", PipelineInfo)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "pipelines", PipelineInfo)

def _crawl(self) -> Iterable[PipelineInfo]:
all_pipelines = list(self._ws.pipelines.list_pipelines())
Expand Down
20 changes: 16 additions & 4 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def legacy_table_acl_support(self):
@cached_property
def permission_manager(self):
return PermissionManager(
self.workspace_client,
self.sql_backend,
self.inventory_database,
[
Expand Down Expand Up @@ -232,11 +233,21 @@ def grants_crawler(self):

@cached_property
def udfs_crawler(self):
return UdfsCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
return UdfsCrawler(
self.workspace_client,
self.sql_backend,
self.inventory_database,
self.config.include_databases,
)

@cached_property
def tables_crawler(self):
return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
return TablesCrawler(
self.workspace_client,
self.sql_backend,
self.inventory_database,
self.config.include_databases,
)

@cached_property
def tables_migrator(self):
Expand Down Expand Up @@ -443,11 +454,11 @@ def query_linter(self):

@cached_property
def directfs_access_crawler_for_paths(self):
return DirectFsAccessCrawler.for_paths(self.sql_backend, self.inventory_database)
return DirectFsAccessCrawler.for_paths(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def directfs_access_crawler_for_queries(self):
return DirectFsAccessCrawler.for_queries(self.sql_backend, self.inventory_database)
return DirectFsAccessCrawler.for_queries(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def redash(self):
Expand Down Expand Up @@ -476,6 +487,7 @@ def data_comparator(self):
@cached_property
def migration_recon(self):
return MigrationRecon(
self.workspace_client,
self.sql_backend,
self.inventory_database,
self.migration_status_refresher,
Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def pipelines_crawler(self):

@cached_property
def table_size_crawler(self):
return TableSizeCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
return TableSizeCrawler(self.tables_crawler)

@cached_property
def policies_crawler(self):
Expand All @@ -84,7 +84,7 @@ def global_init_scripts_crawler(self):

@cached_property
def tables_crawler(self):
return FasterTableScanCrawler(self.sql_backend, self.inventory_database)
return FasterTableScanCrawler(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def tables_in_mounts(self):
Expand Down
66 changes: 63 additions & 3 deletions src/databricks/labs/ucx/framework/crawlers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Sequence
from typing import ClassVar, Generic, Literal, Protocol, TypeVar
from functools import cached_property
from typing import ClassVar, Generic, Literal, Protocol, TypeVar, final

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound

from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.framework.utils import escape_sql_identifier, find_an_admin

logger = logging.getLogger(__name__)

Expand All @@ -21,17 +23,25 @@ class DataclassInstance(Protocol):


class CrawlerBase(ABC, Generic[Result]):
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]):

_cached_workspace_admins: dict[int, str | RuntimeError] = {}
"""Cached user names of workspace administrators, keyed by workspace id."""

def __init__(
self, ws: WorkspaceClient, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]
):
"""
Initializes a CrawlerBase instance.

Args:
ws (WorkspaceClient): A client for the current workspace.
backend (SqlBackend): The backend that executes SQL queries:
Statement Execution API or Databricks Runtime.
catalog (str): The catalog name for the inventory persistence.
schema: The schema name for the inventory persistence.
table: The table name for the inventory persistence.
"""
self._ws = ws
self._catalog = self._valid(catalog)
self._schema = self._valid(schema)
self._table = self._valid(table)
Expand Down Expand Up @@ -107,6 +117,56 @@ def snapshot(self, *, force_refresh: bool = False) -> Iterable[Result]:
"""
return self._snapshot(self._try_fetch, self._crawl, force_refresh=force_refresh)

@final
def owner_of(self, result: Result) -> str:
"""Obtain the user-name of a user that is responsible for the given record.

This is intended to be a point of contact, and is either:

- The user that originally created the resource associated with the result; or
- An active administrator for the current workspace.

Args:
result (Result): The record for which an associated user-name is sought.
Returns:
A string containing the user-name attribute of the user considered to own the resource.
Raises:
RuntimeError if there are no active administrators for the current workspace.
"""
return self._result_owner(result) or self._workspace_admin

@cached_property
def _workspace_admin(self) -> str:
# Avoid repeatedly hitting the shared cache.
return self._find_administrator_for(self._ws)

@classmethod
@final
def _find_administrator_for(cls, ws: WorkspaceClient) -> str:
# Finding an administrator is quite expensive, so we ensure that for a given workspace we only
# do it once.
workspace_id = ws.get_workspace_id()
found_admin_or_error = cls._cached_workspace_admins.get(workspace_id, None)
if isinstance(found_admin_or_error, str):
return found_admin_or_error
if isinstance(found_admin_or_error, RuntimeError):
raise found_admin_or_error

found_admin = find_an_admin(ws)
if found_admin is None or not found_admin.user_name:
msg = f"No active workspace or account administrator can be found for workspace: {workspace_id}"
error = RuntimeError(msg)
cls._cached_workspace_admins[workspace_id] = error
raise error
user_name = found_admin.user_name
cls._cached_workspace_admins[workspace_id] = user_name
return user_name

@classmethod
def _result_owner(cls, result: Result) -> str | None: # pylint: disable=unused-argument
"""Obtain the record-specific user-name associated with the given result, if any."""
return None

@abstractmethod
def _try_fetch(self) -> Iterable[Result]:
"""Fetch existing data that has (previously) been crawled by this crawler.
Expand Down
55 changes: 55 additions & 0 deletions src/databricks/labs/ucx/framework/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import functools
import logging
import subprocess
from collections.abc import Iterable

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.iam import User


logger = logging.getLogger(__name__)

Expand All @@ -22,6 +28,55 @@ def escape_sql_identifier(path: str, *, maxsplit: int = 2) -> str:
return ".".join(escaped)


def _has_role(user: User, role: str) -> bool:
return user.roles is not None and any(r.value == role for r in user.roles)


def find_workspace_admins(ws: WorkspaceClient) -> Iterable[User]:
"""Enumerate the active workspace administrators in a given workspace.

Arguments:
ws (WorkspaceClient): The client for the workspace whose administrators should be enumerated.
Returns:
Iterable[User]: The active workspace administrators, if any.
"""
all_users = ws.users.list(attributes="id,active,userName,roles")
return (user for user in all_users if user.active and _has_role(user, "workspace_admin"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have that role and it's no longer the admins group membership?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this was just a guess while I figured out how it's supposed to be done. (And indeed, it now uses membership of the admins workspace group.)



def find_account_admins(ws: WorkspaceClient) -> Iterable[User]:
"""Enumerate the active account administrators associated with a given workspace.

Arguments:
ws (WorkspaceClient): The client for the workspace whose account administrators should be enumerated.
Returns:
Iterable[User]: The active account administrators, if any.
"""
response = ws.api_client.do(
"GET", "/api/2.0/account/scim/v2/Users", query={"attributes": "id,active,userName,roles"}
)
assert isinstance(response, dict)
all_users = (User.from_dict(resource) for resource in response.get("Resources", []))
return (user for user in all_users if user.active and _has_role(user, "account_admin"))


def find_an_admin(ws: WorkspaceClient) -> User | None:
"""Locate an active administrator for the current workspace.

If an active workspace administrator can be located, this is returned. When there are multiple, they are sorted
alphabetically by user-name and the first is returned. If there are no workspace administrators then an active
account administrator is sought, again returning the first alphabetically by user-name if there is more than one.

Arguments:
ws (WorkspaceClient): The client for the workspace for which an administrator should be located.
Returns:
the first (alphabetically by user-name) active workspace or account administrator, or `None` if neither can be
found.
"""
first_user = functools.partial(min, default=None, key=lambda user: user.name)
return first_user(find_workspace_admins(ws)) or first_user(find_account_admins(ws))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget to sort, because each invocation first user will be different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is already sorting1: head(sort(…))min(…)

Footnotes

  1. It was using the wrong attribute; it should be user.user_name and that's now fixed.



def run_command(command: str | list[str]) -> tuple[int, str, str]:
args = command.split() if isinstance(command, str) else command
logger.info(f"Invoking command: {args!r}")
Expand Down
3 changes: 2 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ class GrantsCrawler(CrawlerBase[Grant]):
"""Crawler that captures access controls that relate to data and other securable objects."""

def __init__(self, tc: TablesCrawler, udf: UdfsCrawler, include_databases: list[str] | None = None):
assert tc._ws == udf._ws
assert tc._backend == udf._backend
assert tc._catalog == udf._catalog
assert tc._schema == udf._schema
super().__init__(tc._backend, tc._catalog, tc._schema, "grants", Grant)
super().__init__(tc._ws, tc._backend, tc._catalog, tc._schema, "grants", Grant)
self._tc = tc
self._udf = udf
self._include_databases = include_databases
Expand Down
11 changes: 4 additions & 7 deletions src/databricks/labs/ucx/hive_metastore/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ class ExternalLocations(CrawlerBase[ExternalLocation]):
_prefix_size: ClassVar[list[int]] = [1, 12]

def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str):
super().__init__(sbe, "hive_metastore", schema, "external_locations", ExternalLocation)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "external_locations", ExternalLocation)

def _external_locations(self, tables: list[Row], mounts) -> Iterable[ExternalLocation]:
min_slash = 2
Expand Down Expand Up @@ -301,8 +300,7 @@ def save_as_terraform_definitions_on_workspace(self, installation: Installation)

class Mounts(CrawlerBase[Mount]):
def __init__(self, backend: SqlBackend, ws: WorkspaceClient, inventory_database: str):
super().__init__(backend, "hive_metastore", inventory_database, "mounts", Mount)
self._dbutils = ws.dbutils
super().__init__(ws, backend, "hive_metastore", inventory_database, "mounts", Mount)

@staticmethod
def _deduplicate_mounts(mounts: list) -> list:
Expand All @@ -320,7 +318,7 @@ def _deduplicate_mounts(mounts: list) -> list:

def _crawl(self) -> Iterable[Mount]:
mounts = []
for mount_point, source, _ in self._dbutils.fs.mounts():
for mount_point, source, _ in self._ws.dbutils.fs.mounts():
mounts.append(Mount(mount_point, source))
return self._deduplicate_mounts(mounts)

Expand Down Expand Up @@ -356,11 +354,10 @@ def __init__(
exclude_paths_in_mount: list[str] | None = None,
include_paths_in_mount: list[str] | None = None,
):
super().__init__(backend, "hive_metastore", inventory_database, "tables", Table)
super().__init__(ws, backend, "hive_metastore", inventory_database, "tables", Table)
self._dbutils = ws.dbutils
self._mounts_crawler = mc
self._include_mounts = include_mounts
self._ws = ws
self._include_paths_in_mount = include_paths_in_mount

irrelevant_patterns = {'_SUCCESS', '_committed_', '_started_'}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ class TableMigrationStatusRefresher(CrawlerBase[TableMigrationStatus]):
"""

def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema, table_crawler: TablesCrawler):
super().__init__(sbe, "hive_metastore", schema, "migration_status", TableMigrationStatus)
self._ws = ws
super().__init__(ws, sbe, "hive_metastore", schema, "migration_status", TableMigrationStatus)
self._table_crawler = table_crawler

def index(self, *, force_refresh: bool = False) -> TableMigrationIndex:
Expand Down
Loading
Loading