From bea7cfeab259f53bc176554b60be0d11df9a301f Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 9 Oct 2024 14:39:02 +0200 Subject: [PATCH] Updated integration test verification that we can locate the user-name of an administrator. Verify that the administrator locator returns the user-name of a user that is an administrator. --- tests/integration/framework/test_owners.py | 51 ++++++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/tests/integration/framework/test_owners.py b/tests/integration/framework/test_owners.py index 670d5817a2..e570378f06 100644 --- a/tests/integration/framework/test_owners.py +++ b/tests/integration/framework/test_owners.py @@ -1,8 +1,53 @@ +import json + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service import iam + from databricks.labs.ucx.contexts.workflow_task import RuntimeContext -def test_fallback_workspace_admin(installation_ctx: RuntimeContext) -> None: - """Verify that a workspace administrator can be found for our integration environment.""" +def _find_admins_group_id(ws: WorkspaceClient) -> str: + for group in ws.groups.list(attributes="id,displayName,meta", filter='displayName eq "admins"'): + if group.id and group.display_name == "admins" and group.meta and group.meta.resource_type == "WorkspaceGroup": + return group.id + msg = f"Could not locate workspace group in {ws.get_workspace_id()}: admins" + raise RuntimeError(msg) + + +def _find_user_with_name(ws: WorkspaceClient, user_name: str) -> iam.User: + for user in ws.users.list(attributes="active,groups,roles,userName", filter=f"userName eq {json.dumps(user_name)}"): + if user.user_name == user_name: + return user + # Use debugger if this is not working to avoid internal usernames in public issues or CI logs. + msg = f"Could not locate user in workspace {ws.get_workspace_id()}: **REDACTED**" + raise RuntimeError(msg) + + +def _user_is_member_of_group(user: iam.User, group_id: str) -> bool: + assert user.groups + return any(g for g in user.groups if g.value == group_id) + + +def _user_has_role(user: iam.User, role_name: str) -> bool: + assert user.roles + return any(r for r in user.roles if r.value == role_name) + + +def test_fallback_admin_user(ws, installation_ctx: RuntimeContext) -> None: + """Verify that an administrator can be found for our integration environment.""" an_admin = installation_ctx.administrator_locator.get_workspace_administrator() - assert "@" in an_admin + # The specific admin username that we get here depends on the set of current admins in the integration environment, + # so that can't be checked directly. Instead we check that either: + # a) they're a member of the 'admins' workspace; or + # b) are an account admin (with the `account_admin` role assigned). + # They must also be an active user. + # + # References: + # https://learn.microsoft.com/en-us/azure/databricks/admin/users-groups/groups#account-admin + # https://learn.microsoft.com/en-us/azure/databricks/admin/users-groups/groups#account-vs-workspace-group + admins_group_id = _find_admins_group_id(ws) + the_user = _find_user_with_name(ws, an_admin) + + assert an_admin == the_user.user_name and the_user.active + assert _user_is_member_of_group(the_user, admins_group_id) or _user_has_role(the_user, "account_admin")