Skip to content

Improve creating UC catalogs #2898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 22 additions & 28 deletions src/databricks/labs/ucx/hive_metastore/catalog_schema.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
import logging
from dataclasses import replace
import fnmatch
from pathlib import PurePath

from databricks.labs.blueprint.tui import Prompts
Expand Down Expand Up @@ -30,7 +29,7 @@ def __init__(
):
self._ws = ws
self._table_mapping = table_mapping
self._external_locations = self._ws.external_locations.list()
self._external_locations = list(self._ws.external_locations.list())
self._principal_grants = principal_grants
self._backend = sql_backend
self._hive_grants_crawler = grants_crawler
Expand All @@ -45,32 +44,19 @@ def create_ucx_catalog(self, prompts: Prompts, *, properties: dict[str, str] | N
properties : (dict[str, str] | None), default None
The properties to pass to the catalog. If None, no properties are passed.
"""
try:
self._create_catalog_validate(self._ucx_catalog, prompts, properties=properties)
except BadRequest as e:
if "already exists" in str(e):
logger.warning(f"Catalog '{self._ucx_catalog}' already exists. Skipping.")
return
raise
self._create_catalog_validate(self._ucx_catalog, prompts, properties=properties)

def create_all_catalogs_schemas(self, prompts: Prompts) -> None:
candidate_catalogs, candidate_schemas = self._get_missing_catalogs_schemas()
for candidate_catalog in candidate_catalogs:
try:
self._create_catalog_validate(candidate_catalog, prompts, properties=None)
except BadRequest as e:
if "already exists" in str(e):
logger.warning(f"Catalog '{candidate_catalog}' already exists. Skipping.")
continue
self._create_catalog_validate(candidate_catalog, prompts, properties=None)
for candidate_catalog, schemas in candidate_schemas.items():
for candidate_schema in schemas:
try:
self._create_schema(candidate_catalog, candidate_schema)
except BadRequest as e:
if "already exists" in str(e):
logger.warning(
f"Schema '{candidate_schema}' in catalog '{candidate_catalog}' already exists. Skipping."
)
logger.warning(f"Skipping already existing schema: {candidate_catalog}.{candidate_schema}")
continue
self._apply_from_legacy_table_acls()
self._update_principal_acl()
Expand Down Expand Up @@ -141,19 +127,28 @@ def _get_database_source_target_mapping(self) -> dict[str, list[SchemaInfo]]:
src_trg_schema_mapping[table_mapping.src_schema].append(schema)
return src_trg_schema_mapping

def _create_catalog_validate(self, catalog: str, prompts: Prompts, *, properties: dict[str, str] | None) -> None:
logger.info(f"Validating UC catalog: {catalog}")
def _create_catalog_validate(
self, catalog_name: str, prompts: Prompts, *, properties: dict[str, str] | None
) -> None:
try:
catalog = self._ws.catalogs.get(catalog_name)
except NotFound:
catalog = None
if catalog:
logger.warning(f"Skipping already existing catalog: {catalog_name}")
return
logger.info(f"Validating UC catalog: {catalog_name}")
attempts = 3
while True:
catalog_storage = prompts.question(
f"Please provide storage location url for catalog: {catalog}", default="metastore"
f"Please provide storage location url for catalog: {catalog_name}", default="metastore"
)
if self._validate_location(catalog_storage):
break
attempts -= 1
if attempts == 0:
raise NotFound(f"Failed to validate location for {catalog} catalog")
self._create_catalog(catalog, catalog_storage, properties=properties)
raise NotFound(f"Failed to validate location for catalog: {catalog_name}")
self._create_catalog(catalog_name, catalog_storage, properties=properties)

def _list_existing(self) -> tuple[set[str], dict[str, set[str]]]:
"""generate a list of existing UC catalogs and schema."""
Expand Down Expand Up @@ -203,19 +198,18 @@ def _get_missing_catalogs_schemas(self) -> tuple[set[str], dict[str, set[str]]]:
target_schemas[catalog] = target_schemas[catalog] - schemas
return target_catalogs, target_schemas

def _validate_location(self, location: str):
def _validate_location(self, location: str) -> bool:
if location == "metastore":
return True
try:
PurePath(location)
except ValueError:
logger.error(f"Invalid location path {location}")
logger.error(f"Invalid location path: {location}")
return False
for external_location in self._external_locations:
if location == external_location.url:
return True
if external_location.url is not None and fnmatch.fnmatch(location, external_location.url + '*'):
if external_location.url is not None and location.startswith(external_location.url):
return True
logger.warning(f"No matching external location found for: {location}")
return False

def _create_catalog(self, catalog: str, catalog_storage: str, *, properties: dict[str, str] | None) -> None:
Expand Down
52 changes: 46 additions & 6 deletions tests/unit/hive_metastore/test_catalog_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@
def prepare_test(ws, backend: MockBackend | None = None) -> CatalogSchema:
ws.catalogs.list.return_value = [CatalogInfo(name="catalog1")]

def get_catalog(catalog_name: str) -> CatalogInfo:
if catalog_name == "catalog1":
return CatalogInfo(name="catalog1")
raise NotFound(f"Catalog: {catalog_name}")

ws.catalogs.get.side_effect = get_catalog

def raise_catalog_exists(catalog: str, *_, **__) -> None:
if catalog == "catalog1":
raise BadRequest("Catalog 'catalog1' already exists")

ws.catalogs.create.side_effect = raise_catalog_exists
ws.schemas.list.return_value = [SchemaInfo(name="schema1")]
ws.external_locations.list.return_value = [ExternalLocationInfo(url="s3://foo/bar")]
ws.external_locations.list.return_value = [
ExternalLocationInfo(url="s3://foo/bar"),
ExternalLocationInfo(url="abfss://container@storageaccount.dfs.core.windows.net"),
]
if backend is None:
backend = MockBackend()
installation = MockInstallation(
Expand Down Expand Up @@ -133,8 +143,8 @@ def test_create_ucx_catalog_creates_ucx_catalog() -> None:

def test_create_ucx_catalog_skips_when_ucx_catalogs_exists(caplog) -> None:
ws = create_autospec(WorkspaceClient)
mock_prompts = MockPrompts({"Please provide storage location url for catalog: ucx": "metastore"})
catalog_schema = prepare_test(ws)
ws.catalogs.get.side_effect = lambda catalog_name: CatalogInfo(name=catalog_name)

def raise_catalog_exists(catalog: str, *_, **__) -> None:
if catalog == "ucx":
Expand All @@ -143,12 +153,20 @@ def raise_catalog_exists(catalog: str, *_, **__) -> None:
ws.catalogs.create.side_effect = raise_catalog_exists

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore.catalog_schema"):
catalog_schema.create_ucx_catalog(mock_prompts)
assert "Catalog 'ucx' already exists. Skipping." in caplog.text
catalog_schema.create_ucx_catalog(MockPrompts({}))
assert "Skipping already existing catalog: ucx" in caplog.text


@pytest.mark.parametrize("location", ["s3://foo/bar", "s3://foo/bar/test", "s3://foo/bar/test/baz"])
def test_create_all_catalogs_schemas_creates_catalogs(location: str):
@pytest.mark.parametrize(
"location",
[
"s3://foo/bar",
"s3://foo/bar/test",
"s3://foo/bar/test/baz",
"abfss://container@storageaccount.dfs.core.windows.net",
],
)
def test_create_all_catalogs_schemas_creates_catalogs(location: str) -> None:
"""Catalog 2-4 should be created; catalog 1 already exists."""
ws = create_autospec(WorkspaceClient)
mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": location})
Expand All @@ -164,6 +182,28 @@ def test_create_all_catalogs_schemas_creates_catalogs(location: str):
ws.catalogs.create.assert_has_calls(calls, any_order=True)


def test_create_all_catalogs_schemas_creates_catalogs_with_different_locations() -> None:
"""Catalog 2-4 should be created; catalog 1 already exists."""
ws = create_autospec(WorkspaceClient)
mock_prompts = MockPrompts(
{
"Please provide storage location url for catalog: catalog2": "s3://foo/bar",
"Please provide storage location url for catalog: catalog3": "s3://foo/bar/test",
"Please provide storage location url for catalog: catalog4": "s3://foo/bar/test/baz",
}
)

catalog_schema = prepare_test(ws)
catalog_schema.create_all_catalogs_schemas(mock_prompts)

calls = [
call("catalog2", storage_root="s3://foo/bar", comment="Created by UCX", properties=None),
call("catalog3", storage_root="s3://foo/bar/test", comment="Created by UCX", properties=None),
call("catalog4", storage_root="s3://foo/bar/test/baz", comment="Created by UCX", properties=None),
]
ws.catalogs.create.assert_has_calls(calls, any_order=True)


@pytest.mark.parametrize(
"catalog,schema",
[("catalog1", "schema2"), ("catalog1", "schema3"), ("catalog2", "schema2"), ("catalog3", "schema3")],
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,8 @@ def test_create_catalogs_schemas_handles_existing(ws, caplog) -> None:
create_catalogs_schemas(ws, prompts, ctx=WorkspaceContext(ws))
ws.catalogs.list.assert_called_once()

assert "Catalog 'test' already exists. Skipping." in caplog.messages
assert "Schema 'test' in catalog 'test' already exists. Skipping." in caplog.messages
assert "Skipping already existing catalog: test" in caplog.messages
assert "Skipping already existing schema: test.test" in caplog.messages


def test_cluster_remap(ws, caplog):
Expand Down Expand Up @@ -887,12 +887,12 @@ def test_assign_metastore_logs_account_id_and_assigns_metastore(caplog, acc_clie
acc_client.metastore_assignments.create.assert_called_once()


def test_create_ucx_catalog_calls_create_catalog(ws) -> None:
def test_create_ucx_catalog_calls_get_catalog(ws) -> None:
prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"})

create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws))

ws.catalogs.create.assert_called_once()
ws.catalogs.get.assert_called_once()


def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) -> None:
Expand Down
Loading