From 48c7968c5951a3f09fa26248aace40432a827929 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Thu, 17 Oct 2024 16:04:47 -0400 Subject: [PATCH 1/2] Created shell implementation for enhanced HMS fed creation --- src/databricks/labs/ucx/cli.py | 4 +- .../labs/ucx/hive_metastore/catalog_schema.py | 31 +++++++++++++++ .../labs/ucx/hive_metastore/federation.py | 39 +++++++++++-------- 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 3effed18e6..e4c0676c13 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -821,10 +821,10 @@ def export_assessment(w: WorkspaceClient, prompts: Prompts): @ucx.command -def create_federated_catalog(w: WorkspaceClient, _: Prompts): +def create_federated_catalog(w: WorkspaceClient, prompts: Prompts): """(Experimental) Create federated catalog from current workspace Hive Metastore.""" ctx = WorkspaceContext(w) - ctx.federation.register_internal_hms_as_federated_catalog() + ctx.federation.register_internal_hms_as_federated_catalog(prompts) @ucx.command diff --git a/src/databricks/labs/ucx/hive_metastore/catalog_schema.py b/src/databricks/labs/ucx/hive_metastore/catalog_schema.py index 91201a562b..a1cb37fba7 100644 --- a/src/databricks/labs/ucx/hive_metastore/catalog_schema.py +++ b/src/databricks/labs/ucx/hive_metastore/catalog_schema.py @@ -29,6 +29,9 @@ class Catalog: name: str """The catalog name""" + connection_name: str | None = None + """The connection name for the catalog used for federated catalogs.""" + @property def full_name(self) -> str: """The full name of the catalog. @@ -134,6 +137,34 @@ def create_ucx_catalog(self, prompts: Prompts, *, properties: dict[str, str] | N """ self._create_catalog_validate(Catalog(self._ucx_catalog), prompts, properties=properties) + def create_federated_catalog( + self, + prompts: Prompts, + catalog_name: str, + connection_name: str, + authorized_paths: str, + *, + properties: dict[str, str] | None = None, + ) -> None: + """Create the HMS Federated catalog. + + Args: + prompts : Prompts + The prompts object to use for interactive input. + catalog_name : str + The catalog name to use for the federated catalog. + connection_name : str + The connection name to use for the federated catalog. + authorized_paths : str + The authorized paths to pass to the catalog. + properties : (dict[str, str] | None), default None + The properties to pass to the catalog. If None, no properties are passed. + """ + if properties is None: + properties = {} + properties["authorized_paths"] = authorized_paths + self._create_catalog_validate(Catalog(catalog_name, connection_name), prompts, properties=properties) + def create_all_catalogs_schemas(self, prompts: Prompts, *, properties: dict[str, str] | None = None) -> None: """Create all UC catalogs and schemas reference by the table mapping file. diff --git a/src/databricks/labs/ucx/hive_metastore/federation.py b/src/databricks/labs/ucx/hive_metastore/federation.py index 861dc06a10..0c50fa1359 100644 --- a/src/databricks/labs/ucx/hive_metastore/federation.py +++ b/src/databricks/labs/ucx/hive_metastore/federation.py @@ -2,21 +2,21 @@ import logging from databricks.labs.blueprint.installation import Installation +from databricks.labs.blueprint.tui import Prompts from databricks.sdk import WorkspaceClient -from databricks.sdk.errors import AlreadyExists, NotFound, BadRequest +from databricks.sdk.errors import AlreadyExists, NotFound from databricks.sdk.service.catalog import ( ConnectionType, ConnectionInfo, SecurableType, Privilege, PermissionsChange, - CatalogInfo, ) from databricks.labs.ucx.account.workspaces import WorkspaceInfo from databricks.labs.ucx.config import WorkspaceConfig from databricks.labs.ucx.hive_metastore import ExternalLocations - +from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema logger = logging.getLogger(__name__) @@ -37,32 +37,37 @@ def __init__( workspace_client: WorkspaceClient, external_locations: ExternalLocations, workspace_info: WorkspaceInfo, + catalog_schema: CatalogSchema, enable_hms_federation: bool = False, ): self._workspace_client = workspace_client self._external_locations = external_locations self._workspace_info = workspace_info self._enable_hms_federation = enable_hms_federation + self._catalog_schema = catalog_schema - def register_internal_hms_as_federated_catalog(self) -> CatalogInfo: + def register_internal_hms_as_federated_catalog(self, prompts: Prompts) -> None: if not self._enable_hms_federation: raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation') name = self._workspace_info.current() connection_info = self._get_or_create_connection(name) assert connection_info.name is not None - try: - return self._workspace_client.catalogs.create( - name=connection_info.name, - connection_name=connection_info.name, - options={"authorized_paths": self._get_authorized_paths()}, - ) - except BadRequest as err: - if err.error_code == 'CATALOG_ALREADY_EXISTS': - logger.info(f'Catalog {connection_info.name} already exists') - for catalog_info in self._workspace_client.catalogs.list(): - if catalog_info.name == connection_info.name: - return catalog_info - raise err + return self._catalog_schema.create_federated_catalog( + prompts, connection_info.name, connection_info.name, self._get_authorized_paths() + ) + # try: + # return self._workspace_client.catalogs.create( + # name=connection_info.name, + # connection_name=connection_info.name, + # options={"authorized_paths": self._get_authorized_paths()}, + # ) + # except BadRequest as err: + # if err.error_code == 'CATALOG_ALREADY_EXISTS': + # logger.info(f'Catalog {connection_info.name} already exists') + # for catalog_info in self._workspace_client.catalogs.list(): + # if catalog_info.name == connection_info.name: + # return catalog_info + # raise err def _get_or_create_connection(self, name: str) -> ConnectionInfo: try: From 7edb55b9a4b56256503c9ad57f87bf10f4242164 Mon Sep 17 00:00:00 2001 From: Liran Bareket Date: Fri, 18 Oct 2024 09:33:19 -0400 Subject: [PATCH 2/2] Created shell implementation for enhanced HMS fed creation --- .../labs/ucx/contexts/workspace_cli.py | 1 + .../labs/ucx/hive_metastore/catalog_schema.py | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workspace_cli.py b/src/databricks/labs/ucx/contexts/workspace_cli.py index 4308f1c61e..5db5f7961f 100644 --- a/src/databricks/labs/ucx/contexts/workspace_cli.py +++ b/src/databricks/labs/ucx/contexts/workspace_cli.py @@ -200,6 +200,7 @@ def federation(self): self.workspace_client, self.external_locations, self.workspace_info, + self.catalog_schema, self.config.enable_hms_federation, ) diff --git a/src/databricks/labs/ucx/hive_metastore/catalog_schema.py b/src/databricks/labs/ucx/hive_metastore/catalog_schema.py index a1cb37fba7..c14e6b8949 100644 --- a/src/databricks/labs/ucx/hive_metastore/catalog_schema.py +++ b/src/databricks/labs/ucx/hive_metastore/catalog_schema.py @@ -160,10 +160,11 @@ def create_federated_catalog( properties : (dict[str, str] | None), default None The properties to pass to the catalog. If None, no properties are passed. """ - if properties is None: - properties = {} - properties["authorized_paths"] = authorized_paths - self._create_catalog_validate(Catalog(catalog_name, connection_name), prompts, properties=properties) + + options = {} + options["authorized_paths"] = authorized_paths + self._create_catalog_validate(Catalog(catalog_name, connection_name), prompts, options=options, + properties=properties) def create_all_catalogs_schemas(self, prompts: Prompts, *, properties: dict[str, str] | None = None) -> None: """Create all UC catalogs and schemas reference by the table mapping file. @@ -207,6 +208,7 @@ def _create_catalog_validate( prompts: Prompts, *, properties: dict[str, str] | None, + options: dict[str, str] | None, ) -> Catalog: catalog_existing = self._get_catalog(catalog) if catalog_existing: @@ -223,7 +225,8 @@ def _create_catalog_validate( attempts -= 1 if attempts == 0: raise NotFound(f"Failed to validate location for catalog: {catalog.name}") - return self._create_catalog(catalog, catalog_storage, properties=properties) + return self._create_catalog(catalog, catalog_storage, connection_name=catalog.connection_name, + properties=properties, options=options) def _validate_location(self, location: str) -> bool: if location == "metastore": @@ -267,7 +270,9 @@ def _create_catalog( catalog: Catalog, catalog_storage: str, *, + connection_name: str | None = None, properties: dict[str, str] | None, + options: dict[str, str] | None, ) -> Catalog: logger.info(f"Creating UC catalog: {catalog.name}") if catalog_storage == "metastore": @@ -276,8 +281,10 @@ def _create_catalog( self._ws.catalogs.create( catalog.name, storage_root=catalog_storage, + connection_name=connection_name, comment="Created by UCX", properties=properties, + options=options ) catalog_created = self._get_catalog(catalog) if catalog_created is None: