Skip to content

Commit fd645fd

Browse files
authored
Add Support for migrating Schema/Catalog ACL for Interactive cluster (#1413)
## Changes - Identified all the database acl grants from the PrincipalACL class and filters for only database grants - creates the mapping from hive metastore schema to UC schema and catalog using the Table Mapping - replaces the hive metastore action with equivalent UC action for both schema and catalog - handles both aws and azure - Note: This PR doesn't cover external location permission which is part of the same issue. I will submit separate PR for that ### Linked issues <!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved. See https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword --> Fixes Partially #1192 and #1193 ### Functionality - [ ] added relevant user documentation - [ ] added new CLI command - [ ] modified existing command: `databricks labs ucx ...` - [ ] added a new workflow - [ ] modified existing workflow: `...` - [ ] added a new table - [ ] modified existing table: `...` ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [X] manually tested - [X] added unit tests - [X] added integration tests - [ ] verified on staging environment (screenshot attached)
1 parent e94dc34 commit fd645fd

File tree

9 files changed

+209
-51
lines changed

9 files changed

+209
-51
lines changed

src/databricks/labs/ucx/contexts/application.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def table_mapping(self):
303303

304304
@cached_property
305305
def catalog_schema(self):
306-
return CatalogSchema(self.workspace_client, self.table_mapping)
306+
return CatalogSchema(self.workspace_client, self.table_mapping, self.principal_acl, self.sql_backend)
307307

308308
@cached_property
309309
def languages(self):

src/databricks/labs/ucx/hive_metastore/catalog_schema.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import logging
22
from pathlib import PurePath
3+
import dataclasses
34

45
from databricks.labs.blueprint.tui import Prompts
6+
from databricks.labs.lsql.backends import SqlBackend
7+
from databricks.labs.ucx.hive_metastore.grants import PrincipalACL, Grant
58
from databricks.sdk import WorkspaceClient
69
from databricks.sdk.errors import NotFound
710

@@ -11,10 +14,14 @@
1114

1215

1316
class CatalogSchema:
14-
def __init__(self, ws: WorkspaceClient, table_mapping: TableMapping):
17+
def __init__(
18+
self, ws: WorkspaceClient, table_mapping: TableMapping, principal_grants: PrincipalACL, sql_backend: SqlBackend
19+
):
1520
self._ws = ws
1621
self._table_mapping = table_mapping
1722
self._external_locations = self._ws.external_locations.list()
23+
self._principal_grants = principal_grants
24+
self._backend = sql_backend
1825

1926
def create_all_catalogs_schemas(self, prompts: Prompts):
2027
candidate_catalogs, candidate_schemas = self._get_missing_catalogs_schemas()
@@ -23,6 +30,53 @@ def create_all_catalogs_schemas(self, prompts: Prompts):
2330
for candidate_catalog, schemas in candidate_schemas.items():
2431
for candidate_schema in schemas:
2532
self._create_schema(candidate_catalog, candidate_schema)
33+
self._update_principal_acl()
34+
35+
def _update_principal_acl(self):
36+
grants = self._get_catalog_schema_grants()
37+
for grant in grants:
38+
acl_migrate_sql = grant.uc_grant_sql()
39+
if acl_migrate_sql is None:
40+
logger.warning(f"Cannot identify UC grant for {grant.this_type_and_key()}. Skipping.")
41+
continue
42+
logger.debug(f"Migrating acls on {grant.this_type_and_key()} using SQL query: {acl_migrate_sql}")
43+
self._backend.execute(acl_migrate_sql)
44+
45+
def _get_catalog_schema_grants(self):
46+
catalog_grants: set[Grant] = set()
47+
new_grants = []
48+
src_trg_schema_mapping = self._get_database_source_target_mapping()
49+
grants = self._principal_grants.get_interactive_cluster_grants()
50+
# filter on grants to only get database level grants
51+
database_grants = [grant for grant in grants if grant.table is None and grant.view is None]
52+
for db_grant in database_grants:
53+
new_grants.append(
54+
dataclasses.replace(
55+
db_grant,
56+
# replace source database with taget UC database
57+
database=src_trg_schema_mapping[db_grant.database]['target_schema'],
58+
# replace hive_metastore with target UC catalog
59+
catalog=src_trg_schema_mapping[db_grant.database]['target_catalog'],
60+
)
61+
)
62+
for grant in new_grants:
63+
catalog_grants.add(dataclasses.replace(grant, database=None))
64+
new_grants.extend(catalog_grants)
65+
return new_grants
66+
67+
def _get_database_source_target_mapping(self) -> dict[str, dict]:
68+
"""generate a dictionary of source database in hive_metastore and its
69+
mapping of target UC catalog and schema from the table mappings."""
70+
src_trg_schema_mapping: dict[str, dict] = {}
71+
table_mappings = self._table_mapping.load()
72+
for mappings in table_mappings:
73+
if mappings.src_schema not in src_trg_schema_mapping:
74+
src_trg_schema_mapping[mappings.src_schema] = {
75+
'target_catalog': mappings.catalog_name,
76+
'target_schema': mappings.dst_schema,
77+
}
78+
continue
79+
return src_trg_schema_mapping
2680

2781
def _create_catalog_validate(self, catalog, prompts: Prompts):
2882
logger.info(f"Creating UC catalog: {catalog}")

src/databricks/labs/ucx/hive_metastore/grants.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def uc_grant_sql(self, object_type: str | None = None, object_key: str | None =
166166
("DATABASE", "OWN"): self._set_owner_sql,
167167
("DATABASE", "READ_METADATA"): self._uc_action("BROWSE"),
168168
("CATALOG", "OWN"): self._set_owner_sql,
169+
("CATALOG", "USAGE"): self._uc_action("USE CATALOG"),
169170
}
170171
make_query = hive_to_uc.get((object_type, self.action_type), None)
171172
if make_query is None:
@@ -530,8 +531,6 @@ def get_interactive_cluster_grants(self) -> list[Grant]:
530531
continue
531532
cluster_usage = self._get_grants(locations, principals, tables, mounts)
532533
grants.update(cluster_usage)
533-
catalog_grants = [Grant(principal, "USE", "hive_metastore") for principal in principals]
534-
grants.update(catalog_grants)
535534
return list(grants)
536535

537536
def _get_privilege(self, table: Table, locations: dict[str, str], mounts: list[Mount]):
@@ -557,7 +556,7 @@ def _get_privilege(self, table: Table, locations: dict[str, str], mounts: list[M
557556
def _get_database_grants(self, tables: list[Table], principals: list[str]) -> list[Grant]:
558557
databases = {table.database for table in tables}
559558
return [
560-
Grant(principal, "USE", "hive_metastore", database) for database in databases for principal in principals
559+
Grant(principal, "USAGE", "hive_metastore", database) for database in databases for principal in principals
561560
]
562561

563562
def _get_grants(

tests/integration/conftest.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,3 +665,24 @@ def prepare_tables_for_migration(
665665
installation_ctx.save_mounts()
666666
installation_ctx.with_dummy_grants_and_tacls()
667667
return tables, dst_schema
668+
669+
670+
@pytest.fixture
671+
def prepared_principal_acl(runtime_ctx, env_or_skip, make_mounted_location, make_catalog, make_schema):
672+
src_schema = make_schema(catalog_name="hive_metastore")
673+
src_external_table = runtime_ctx.make_table(
674+
catalog_name=src_schema.catalog_name,
675+
schema_name=src_schema.name,
676+
external_csv=make_mounted_location,
677+
)
678+
dst_catalog = make_catalog()
679+
dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name)
680+
rules = [Rule.from_src_dst(src_external_table, dst_schema)]
681+
runtime_ctx.with_table_mapping_rules(rules)
682+
runtime_ctx.with_dummy_resource_permission()
683+
return (
684+
runtime_ctx,
685+
f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}",
686+
f"{dst_catalog.name}.{dst_schema.name}",
687+
dst_catalog.name,
688+
)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
def get_azure_spark_conf():
2+
return {
3+
"spark.databricks.cluster.profile": "singleNode",
4+
"spark.master": "local[*]",
5+
"fs.azure.account.auth.type.labsazurethings.dfs.core.windows.net": "OAuth",
6+
"fs.azure.account.oauth.provider.type.labsazurethings.dfs.core.windows.net": "org.apache.hadoop.fs"
7+
".azurebfs.oauth2.ClientCredsTokenProvider",
8+
"fs.azure.account.oauth2.client.id.labsazurethings.dfs.core.windows.net": "dummy_application_id",
9+
"fs.azure.account.oauth2.client.secret.labsazurethings.dfs.core.windows.net": "dummy",
10+
"fs.azure.account.oauth2.client.endpoint.labsazurethings.dfs.core.windows.net": "https://login"
11+
".microsoftonline.com/directory_12345/oauth2/token",
12+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import logging
2+
from datetime import timedelta
3+
import pytest
4+
5+
from databricks.labs.blueprint.tui import MockPrompts
6+
7+
from databricks.sdk.errors import NotFound
8+
from databricks.sdk.retries import retried
9+
from databricks.sdk.service.compute import DataSecurityMode, AwsAttributes
10+
from databricks.sdk.service.catalog import Privilege, SecurableType, PrivilegeAssignment
11+
from databricks.sdk.service.iam import PermissionLevel
12+
13+
from . import get_azure_spark_conf
14+
15+
logger = logging.getLogger(__name__)
16+
_SPARK_CONF = get_azure_spark_conf()
17+
18+
19+
@retried(on=[NotFound], timeout=timedelta(minutes=2))
20+
def test_create_catalog_schema_with_principal_acl_azure(
21+
ws, make_user, prepared_principal_acl, make_cluster_permissions, make_cluster
22+
):
23+
if not ws.config.is_azure:
24+
pytest.skip("only works in azure test env")
25+
ctx, _, schema_name, catalog_name = prepared_principal_acl
26+
27+
cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF, data_security_mode=DataSecurityMode.NONE)
28+
user = make_user()
29+
make_cluster_permissions(
30+
object_id=cluster.cluster_id,
31+
permission_level=PermissionLevel.CAN_ATTACH_TO,
32+
user_name=user.user_name,
33+
)
34+
catalog_schema = ctx.catalog_schema
35+
mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": ""})
36+
catalog_schema.create_all_catalogs_schemas(mock_prompts)
37+
38+
schema_grants = ws.grants.get(SecurableType.SCHEMA, schema_name)
39+
catalog_grants = ws.grants.get(SecurableType.CATALOG, catalog_name)
40+
schema_grant = PrivilegeAssignment(user.user_name, [Privilege.USE_SCHEMA])
41+
catalog_grant = PrivilegeAssignment(user.user_name, [Privilege.USE_CATALOG])
42+
assert schema_grant in schema_grants.privilege_assignments
43+
assert catalog_grant in catalog_grants.privilege_assignments
44+
45+
46+
@retried(on=[NotFound], timeout=timedelta(minutes=3))
47+
def test_create_catalog_schema_with_principal_acl_aws(
48+
ws, make_user, prepared_principal_acl, make_cluster_permissions, make_cluster, env_or_skip
49+
):
50+
ctx, _, schema_name, catalog_name = prepared_principal_acl
51+
52+
cluster = make_cluster(
53+
single_node=True,
54+
data_security_mode=DataSecurityMode.NONE,
55+
aws_attributes=AwsAttributes(instance_profile_arn=env_or_skip("TEST_WILDCARD_INSTANCE_PROFILE")),
56+
)
57+
user = make_user()
58+
make_cluster_permissions(
59+
object_id=cluster.cluster_id,
60+
permission_level=PermissionLevel.CAN_ATTACH_TO,
61+
user_name=user.user_name,
62+
)
63+
catalog_schema = ctx.catalog_schema
64+
mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": ""})
65+
catalog_schema.create_all_catalogs_schemas(mock_prompts)
66+
67+
schema_grants = ws.grants.get(SecurableType.SCHEMA, schema_name)
68+
catalog_grants = ws.grants.get(SecurableType.CATALOG, catalog_name)
69+
schema_grant = PrivilegeAssignment(user.user_name, [Privilege.USE_SCHEMA])
70+
catalog_grant = PrivilegeAssignment(user.user_name, [Privilege.USE_CATALOG])
71+
assert schema_grant in schema_grants.privilege_assignments
72+
assert catalog_grant in catalog_grants.privilege_assignments

tests/integration/hive_metastore/test_migrate.py

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,11 @@
1010

1111
from databricks.labs.ucx.hive_metastore.mapping import Rule, TableMapping
1212
from databricks.labs.ucx.hive_metastore.tables import AclMigrationWhat, Table, What
13+
from . import get_azure_spark_conf
1314

1415

1516
logger = logging.getLogger(__name__)
16-
_SPARK_CONF = {
17-
"spark.databricks.cluster.profile": "singleNode",
18-
"spark.master": "local[*]",
19-
"fs.azure.account.auth.type.labsazurethings.dfs.core.windows.net": "OAuth",
20-
"fs.azure.account.oauth.provider.type.labsazurethings.dfs.core.windows.net": "org.apache.hadoop.fs"
21-
".azurebfs.oauth2.ClientCredsTokenProvider",
22-
"fs.azure.account.oauth2.client.id.labsazurethings.dfs.core.windows.net": "dummy_application_id",
23-
"fs.azure.account.oauth2.client.secret.labsazurethings.dfs.core.windows.net": "dummy",
24-
"fs.azure.account.oauth2.client.endpoint.labsazurethings.dfs.core.windows.net": "https://login"
25-
".microsoftonline.com/directory_12345/oauth2/token",
26-
}
17+
_SPARK_CONF = get_azure_spark_conf()
2718

2819

2920
@retried(on=[NotFound], timeout=timedelta(minutes=2))
@@ -425,31 +416,13 @@ def test_migrate_managed_tables_with_acl(ws, sql_backend, runtime_ctx, make_cata
425416
assert target_table_grants.privilege_assignments[0].privileges == [Privilege.MODIFY, Privilege.SELECT]
426417

427418

428-
@pytest.fixture
429-
def prepared_principal_acl(runtime_ctx, env_or_skip, make_mounted_location, make_catalog, make_schema):
430-
src_schema = make_schema(catalog_name="hive_metastore")
431-
src_external_table = runtime_ctx.make_table(
432-
catalog_name=src_schema.catalog_name,
433-
schema_name=src_schema.name,
434-
external_csv=make_mounted_location,
435-
)
436-
dst_catalog = make_catalog()
437-
dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name)
438-
rules = [Rule.from_src_dst(src_external_table, dst_schema)]
439-
runtime_ctx.with_table_mapping_rules(rules)
440-
return (
441-
runtime_ctx,
442-
f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}",
443-
)
444-
445-
446419
@retried(on=[NotFound], timeout=timedelta(minutes=2))
447420
def test_migrate_external_tables_with_principal_acl_azure(
448421
ws, make_user, prepared_principal_acl, make_cluster_permissions, make_cluster, make_ucx_group
449422
):
450423
if not ws.config.is_azure:
451424
pytest.skip("only works in azure test env")
452-
ctx, table_full_name = prepared_principal_acl
425+
ctx, table_full_name, _, _ = prepared_principal_acl
453426
cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF, data_security_mode=DataSecurityMode.NONE)
454427
ctx.with_dummy_resource_permission()
455428
table_migrate = ctx.tables_migrator
@@ -490,7 +463,7 @@ def test_migrate_external_tables_with_principal_acl_azure(
490463
def test_migrate_external_tables_with_principal_acl_aws(
491464
ws, make_user, prepared_principal_acl, make_cluster_permissions, make_cluster, env_or_skip
492465
):
493-
ctx, table_full_name = prepared_principal_acl
466+
ctx, table_full_name, _, _ = prepared_principal_acl
494467
ctx.with_dummy_resource_permission()
495468
cluster = make_cluster(
496469
single_node=True,

tests/unit/hive_metastore/test_catalog_schema.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@
99
from databricks.sdk.service.catalog import CatalogInfo, ExternalLocationInfo, SchemaInfo
1010

1111
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
12+
from databricks.labs.ucx.hive_metastore.grants import PrincipalACL, Grant
1213
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
1314

1415

15-
def prepare_test(ws) -> CatalogSchema:
16+
def prepare_test(ws, backend: MockBackend | None = None) -> CatalogSchema:
1617
ws.catalogs.list.return_value = [CatalogInfo(name="catalog1")]
1718
ws.schemas.list.return_value = [SchemaInfo(name="schema1")]
1819
ws.external_locations.list.return_value = [ExternalLocationInfo(url="s3://foo/bar")]
19-
backend = MockBackend()
20+
if backend is None:
21+
backend = MockBackend()
2022
installation = MockInstallation(
2123
{
2224
'mapping.csv': [
@@ -48,16 +50,26 @@ def prepare_test(ws) -> CatalogSchema:
4850
}
4951
)
5052
table_mapping = TableMapping(installation, ws, backend)
51-
52-
return CatalogSchema(ws, table_mapping)
53+
principal_acl = create_autospec(PrincipalACL)
54+
grants = [
55+
Grant('user1', 'SELECT', 'catalog1', 'schema3', 'table'),
56+
Grant('user1', 'MODIFY', 'catalog2', 'schema2', 'table'),
57+
Grant('user1', 'SELECY', 'catalog2', 'schema2', 'table2'),
58+
Grant('user1', 'USAGE', 'hive_metastore', 'schema3'),
59+
Grant('user1', 'USAGE', 'hive_metastore', 'schema2'),
60+
]
61+
principal_acl.get_interactive_cluster_grants.return_value = grants
62+
return CatalogSchema(ws, table_mapping, principal_acl, backend)
5363

5464

5565
def test_create():
5666
ws = create_autospec(WorkspaceClient)
5767
mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": "s3://foo/bar"})
5868

5969
catalog_schema = prepare_test(ws)
60-
catalog_schema.create_all_catalogs_schemas(mock_prompts)
70+
catalog_schema.create_all_catalogs_schemas(
71+
mock_prompts,
72+
)
6173
ws.catalogs.create.assert_called_once_with("catalog2", storage_root="s3://foo/bar", comment="Created by UCX")
6274
ws.schemas.create.assert_any_call("schema2", "catalog2", comment="Created by UCX")
6375
ws.schemas.create.assert_any_call("schema3", "catalog1", comment="Created by UCX")
@@ -89,3 +101,22 @@ def test_no_catalog_storage():
89101
catalog_schema = prepare_test(ws)
90102
catalog_schema.create_all_catalogs_schemas(mock_prompts)
91103
ws.catalogs.create.assert_called_once_with("catalog2", comment="Created by UCX")
104+
105+
106+
def test_catalog_schema_acl():
107+
ws = create_autospec(WorkspaceClient)
108+
backend = MockBackend()
109+
mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": ""})
110+
catalog_schema = prepare_test(ws, backend)
111+
catalog_schema.create_all_catalogs_schemas(
112+
mock_prompts,
113+
)
114+
queries = [
115+
'GRANT USE SCHEMA ON DATABASE catalog1.schema3 TO `user1`',
116+
'GRANT USE SCHEMA ON DATABASE catalog2.schema2 TO `user1`',
117+
'GRANT USE CATALOG ON CATALOG catalog1 TO `user1`',
118+
'GRANT USE CATALOG ON CATALOG catalog2 TO `user1`',
119+
]
120+
assert len(backend.queries) == 4
121+
for query in queries:
122+
assert query in backend.queries

0 commit comments

Comments
 (0)