Skip to content

Commit e768af2

Browse files
authored
MySQL External HMS Support for HMS Federation (#3385)
Added support for SQL Based HMS (MySQL) when creating an HMS Federated Catalog
1 parent 938e4c5 commit e768af2

File tree

11 files changed

+284
-42
lines changed

11 files changed

+284
-42
lines changed

src/databricks/labs/ucx/aws/access.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def get_roles_to_migrate(self) -> list[AWSCredentialCandidate]:
226226
"""
227227
Identify the roles that need to be migrated to UC from the UC compatible roles list.
228228
"""
229-
external_locations = self._locations.external_locations_with_root()
229+
external_locations = list(self._locations.external_locations_with_root())
230230
logger.info(f"Found {len(external_locations)} external locations")
231231
compatible_roles = self.load_uc_compatible_roles()
232232
roles: dict[str, AWSCredentialCandidate] = {}

src/databricks/labs/ucx/aws/locations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def __init__(
2121
external_locations: ExternalLocations,
2222
aws_resource_permissions: AWSResourcePermissions,
2323
principal_acl: PrincipalACL,
24+
*,
2425
enable_hms_federation: bool = False,
2526
):
2627
self._ws = ws

src/databricks/labs/ucx/azure/locations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(
2020
resource_permissions: AzureResourcePermissions,
2121
azurerm: AzureResources,
2222
principal_acl: PrincipalACL,
23+
*,
2324
enable_hms_federation: bool = False,
2425
):
2526
self._ws = ws

src/databricks/labs/ucx/cli.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -870,10 +870,10 @@ def export_assessment(w: WorkspaceClient, prompts: Prompts):
870870

871871

872872
@ucx.command
873-
def create_federated_catalog(w: WorkspaceClient, _: Prompts):
873+
def create_federated_catalog(w: WorkspaceClient, prompts: Prompts):
874874
"""(Experimental) Create federated catalog from current workspace Hive Metastore."""
875875
ctx = WorkspaceContext(w)
876-
ctx.federation.register_internal_hms_as_federated_catalog()
876+
ctx.federation.create_from_cli(prompts)
877877

878878

879879
@ucx.command

src/databricks/labs/ucx/contexts/application.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ def external_locations(self) -> ExternalLocations:
415415
self.inventory_database,
416416
self.tables_crawler,
417417
self.mounts_crawler,
418-
self.config.enable_hms_federation,
418+
enable_hms_federation=self.config.enable_hms_federation,
419419
)
420420

421421
@cached_property

src/databricks/labs/ucx/contexts/workspace_cli.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def external_locations_migration(self) -> AWSExternalLocationsMigration | Extern
118118
self.external_locations,
119119
self.aws_resource_permissions,
120120
self.principal_acl,
121-
self.config.enable_hms_federation,
121+
enable_hms_federation=self.config.enable_hms_federation,
122122
)
123123
if self.is_azure:
124124
return ExternalLocationsMigration(
@@ -127,7 +127,7 @@ def external_locations_migration(self) -> AWSExternalLocationsMigration | Extern
127127
self.azure_resource_permissions,
128128
self.azure_resources,
129129
self.principal_acl,
130-
self.config.enable_hms_federation,
130+
enable_hms_federation=self.config.enable_hms_federation,
131131
)
132132
raise NotImplementedError
133133

@@ -200,7 +200,8 @@ def federation(self):
200200
self.workspace_client,
201201
self.external_locations,
202202
self.workspace_info,
203-
self.config.enable_hms_federation,
203+
self.config,
204+
enable_hms_federation=self.config.enable_hms_federation,
204205
)
205206

206207

src/databricks/labs/ucx/hive_metastore/federation.py

Lines changed: 163 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
import collections
22
import logging
3+
import re
4+
from dataclasses import dataclass, replace
5+
from functools import cached_property
6+
from typing import ClassVar
7+
from packaging.version import Version, InvalidVersion
8+
39

410
from databricks.labs.blueprint.installation import Installation
11+
from databricks.labs.blueprint.tui import Prompts
512
from databricks.sdk import WorkspaceClient
613
from databricks.sdk.errors import AlreadyExists, NotFound, BadRequest
714
from databricks.sdk.service.catalog import (
@@ -14,13 +21,38 @@
1421
)
1522

1623
from databricks.labs.ucx.account.workspaces import WorkspaceInfo
24+
from databricks.labs.ucx.assessment.secrets import SecretsMixin
1725
from databricks.labs.ucx.config import WorkspaceConfig
1826
from databricks.labs.ucx.hive_metastore import ExternalLocations
1927

2028

2129
logger = logging.getLogger(__name__)
2230

2331

32+
@dataclass
33+
class ExternalHmsInfo:
34+
"""
35+
This is a dataclass that represents the external Hive Metastore connection information.
36+
It supports non glue external metastores.
37+
"""
38+
39+
database_type: str
40+
host: str
41+
port: str
42+
database: str
43+
user: str | None
44+
password: str | None
45+
version: str | None
46+
47+
def as_dict(self) -> dict[str, str]:
48+
return {
49+
"database": self.database,
50+
"db_type": self.database_type,
51+
"host": self.host,
52+
"port": self.port,
53+
}
54+
55+
2456
class HiveMetastoreFederationEnabler:
2557
def __init__(self, installation: Installation):
2658
self._installation = installation
@@ -31,61 +63,174 @@ def enable(self):
3163
self._installation.save(config)
3264

3365

34-
class HiveMetastoreFederation:
66+
class HiveMetastoreFederation(SecretsMixin):
3567
def __init__(
3668
self,
37-
workspace_client: WorkspaceClient,
69+
ws: WorkspaceClient,
3870
external_locations: ExternalLocations,
3971
workspace_info: WorkspaceInfo,
72+
config: WorkspaceConfig,
73+
*,
4074
enable_hms_federation: bool = False,
4175
):
42-
self._workspace_client = workspace_client
76+
self._ws = ws
4377
self._external_locations = external_locations
4478
self._workspace_info = workspace_info
4579
self._enable_hms_federation = enable_hms_federation
80+
self._config = config
81+
82+
# Supported databases and version for HMS Federation
83+
supported_database_versions: ClassVar[dict[str, list[str]]] = {
84+
"mysql": ["2.3", "0.13"],
85+
}
4686

47-
def register_internal_hms_as_federated_catalog(self) -> CatalogInfo:
87+
def create_from_cli(self, prompts: Prompts) -> None:
4888
if not self._enable_hms_federation:
4989
raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation')
50-
name = self._workspace_info.current()
51-
connection_info = self._get_or_create_connection(name)
90+
91+
name = prompts.question(
92+
'Enter the name of the Hive Metastore connection and catalog', default=self._workspace_info.current()
93+
)
94+
95+
if self._external_hms and prompts.confirm(
96+
f'A supported external Hive Metastore connection was identified: {self._external_hms.database_type}. '
97+
f'Use this connection?'
98+
):
99+
connection_info = self._get_or_create_ext_connection(name, self._external_hms)
100+
else:
101+
connection_info = self._get_or_create_int_connection(name)
102+
52103
assert connection_info.name is not None
104+
self._register_federated_catalog(connection_info)
105+
106+
@cached_property
107+
def _external_hms(self) -> ExternalHmsInfo | None:
108+
if not self._config.spark_conf:
109+
logger.info('Spark config not found')
110+
return None
111+
spark_config = self._config.spark_conf
112+
jdbc_url = self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionURL')
113+
if not jdbc_url:
114+
logger.info('JDBC URL not found')
115+
return None
116+
version_value = self._get_value_from_config_key(spark_config, 'spark.sql.hive.metastore.version')
117+
if not version_value:
118+
logger.info('Hive Metastore version not found')
119+
return None
53120
try:
54-
return self._workspace_client.catalogs.create(
121+
version = Version(version_value)
122+
except InvalidVersion:
123+
logger.info('Hive Metastore version is not valid')
124+
return None
125+
major_minor_version = f"{version.major}.{version.minor}"
126+
external_hms = replace(self._split_jdbc_url(jdbc_url), version=major_minor_version)
127+
supported_versions = self.supported_database_versions.get(external_hms.database_type)
128+
if not supported_versions:
129+
logger.info(f'Unsupported Hive Metastore: {external_hms.database_type}')
130+
return None
131+
if major_minor_version not in supported_versions:
132+
logger.info(f'Unsupported Hive Metastore Version: {external_hms.database_type} - {version}')
133+
return None
134+
135+
if not external_hms.user:
136+
external_hms = replace(
137+
external_hms,
138+
user=self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionUserName'),
139+
)
140+
if not external_hms.password:
141+
external_hms = replace(
142+
external_hms,
143+
password=self._get_value_from_config_key(
144+
spark_config, 'spark.hadoop.javax.jdo.option.ConnectionPassword'
145+
),
146+
)
147+
return external_hms
148+
149+
@classmethod
150+
def _split_jdbc_url(cls, jdbc_url: str) -> ExternalHmsInfo:
151+
# Define the regex pattern to match the JDBC URL components
152+
pattern = re.compile(
153+
r'jdbc:(?P<db_type>[a-zA-Z0-9]+)://(?P<host>[^:/]+):(?P<port>\d+)/(?P<database>[^?]+)(\?user=(?P<user>[^&]+)&password=(?P<password>[^&]+))?'
154+
)
155+
match = pattern.match(jdbc_url)
156+
if not match:
157+
raise ValueError(f'Unsupported JDBC URL: {jdbc_url}')
158+
159+
db_type = match.group('db_type')
160+
host = match.group('host')
161+
port = match.group('port')
162+
database = match.group('database')
163+
user = match.group('user')
164+
password = match.group('password')
165+
166+
return ExternalHmsInfo(db_type, host, port, database, user, password, None)
167+
168+
def _register_federated_catalog(
169+
self,
170+
connection_info,
171+
) -> CatalogInfo:
172+
try:
173+
return self._ws.catalogs.create(
55174
name=connection_info.name,
56175
connection_name=connection_info.name,
57176
options={"authorized_paths": self._get_authorized_paths()},
58177
)
59178
except BadRequest as err:
60179
if err.error_code == 'CATALOG_ALREADY_EXISTS':
61180
logger.info(f'Catalog {connection_info.name} already exists')
62-
for catalog_info in self._workspace_client.catalogs.list():
181+
for catalog_info in self._ws.catalogs.list():
63182
if catalog_info.name == connection_info.name:
64183
return catalog_info
65184
raise err
66185

67-
def _get_or_create_connection(self, name: str) -> ConnectionInfo:
186+
def _get_or_create_int_connection(self, name: str) -> ConnectionInfo:
68187
try:
69-
return self._workspace_client.connections.create(
188+
return self._ws.connections.create(
70189
name=name,
71190
connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change
72191
options={"builtin": "true"},
73192
)
74193
except AlreadyExists:
75-
for connection in self._workspace_client.connections.list():
76-
if connection.name == name:
77-
return connection
194+
return self._get_existing_connection(name)
195+
196+
def _get_existing_connection(self, name: str) -> ConnectionInfo:
197+
for connection in self._ws.connections.list():
198+
if connection.name == name:
199+
return connection
78200
raise NotFound(f'Connection {name} not found')
79201

202+
def _get_or_create_ext_connection(self, name: str, external_hms: ExternalHmsInfo) -> ConnectionInfo:
203+
options = external_hms.as_dict()
204+
if external_hms.user:
205+
options["user"] = external_hms.user
206+
if external_hms.password:
207+
options["password"] = external_hms.password
208+
if external_hms.version:
209+
options["version"] = external_hms.version
210+
try:
211+
return self._ws.connections.create(
212+
name=name,
213+
connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change
214+
options=options,
215+
)
216+
except AlreadyExists:
217+
return self._get_existing_connection(name)
218+
80219
def _get_authorized_paths(self) -> str:
81220
existing = {}
82-
for external_location in self._workspace_client.external_locations.list():
221+
for external_location in self._ws.external_locations.list():
83222
existing[external_location.url] = external_location
84223
authorized_paths = []
85-
current_user = self._workspace_client.current_user.me()
224+
current_user = self._ws.current_user.me()
86225
if not current_user.user_name:
87226
raise NotFound('Current user not found')
88-
for external_location_info in self._external_locations.external_locations_with_root():
227+
# Get the external locations. If not using external HMS, include the root DBFS location.
228+
if self._external_hms is not None:
229+
external_locations = self._external_locations.external_locations_with_root()
230+
else:
231+
external_locations = self._external_locations.snapshot()
232+
233+
for external_location_info in external_locations:
89234
location = ExternalLocations.clean_location(external_location_info.location)
90235
existing_location = existing.get(location)
91236
if not existing_location:
@@ -103,11 +248,11 @@ def _add_missing_permissions_if_needed(self, location_name: str, current_user: s
103248
grants = self._location_grants(location_name)
104249
if Privilege.CREATE_FOREIGN_SECURABLE not in grants[current_user]:
105250
change = PermissionsChange(principal=current_user, add=[Privilege.CREATE_FOREIGN_SECURABLE])
106-
self._workspace_client.grants.update(SecurableType.EXTERNAL_LOCATION, location_name, changes=[change])
251+
self._ws.grants.update(SecurableType.EXTERNAL_LOCATION, location_name, changes=[change])
107252

108253
def _location_grants(self, location_name: str) -> dict[str, set[Privilege]]:
109254
grants: dict[str, set[Privilege]] = collections.defaultdict(set)
110-
result = self._workspace_client.grants.get(SecurableType.EXTERNAL_LOCATION, location_name)
255+
result = self._ws.grants.get(SecurableType.EXTERNAL_LOCATION, location_name)
111256
if not result.privilege_assignments:
112257
return grants
113258
for assignment in result.privilege_assignments:

src/databricks/labs/ucx/hive_metastore/locations.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ def __init__(
155155
schema: str,
156156
tables_crawler: TablesCrawler,
157157
mounts_crawler: 'MountsCrawler',
158+
*,
158159
enable_hms_federation: bool = False,
159160
):
160161
super().__init__(sql_backend, "hive_metastore", schema, "external_locations", ExternalLocation)
@@ -174,21 +175,20 @@ def clean_location(location: str) -> str:
174175
# Having s3a and s3 as separate locations will cause issues when trying to find overlapping locations
175176
return re.sub(r"^s3a:/", r"s3:/", location).rstrip("/")
176177

177-
def external_locations_with_root(self) -> list[ExternalLocation]:
178+
def external_locations_with_root(self) -> Iterable[ExternalLocation]:
178179
"""
179180
Produces a list of external locations with the DBFS root location appended to the list.
180181
Utilizes the snapshot method.
181182
Used for HMS Federation.
182183
183-
Returns:
184-
List of ExternalLocation objects
184+
Yields:
185+
Iterable[Result]: Combination of all the external locations and the DBFS root location
185186
"""
186187

187-
external_locations = list(self.snapshot())
188+
yield from self.snapshot()
188189
dbfs_root = self._get_dbfs_root()
189190
if dbfs_root:
190-
external_locations.append(dbfs_root)
191-
return external_locations
191+
yield dbfs_root
192192

193193
def _get_dbfs_root(self) -> ExternalLocation | None:
194194
"""

tests/integration/hive_metastore/test_federation.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from unittest.mock import create_autospec
22

33
import pytest
4+
from databricks.labs.blueprint.tui import MockPrompts
45
from databricks.sdk import WorkspaceClient
56

67
from databricks.labs.ucx.account.workspaces import WorkspaceInfo
@@ -14,13 +15,14 @@ def ws():
1415

1516

1617
@pytest.mark.skip("needs to be enabled")
17-
def test_federation(ws, sql_backend):
18+
def test_federation(ws, ctx, sql_backend):
1819
schema = 'ucx'
1920
tables_crawler = TablesCrawler(sql_backend, schema)
2021
mounts_crawler = MountsCrawler(sql_backend, ws, schema)
2122
external_locations = ExternalLocations(ws, sql_backend, schema, tables_crawler, mounts_crawler)
2223
workspace_info = create_autospec(WorkspaceInfo)
2324
workspace_info.current.return_value = 'some_thing'
24-
federation = HiveMetastoreFederation(ws, external_locations, workspace_info)
25-
federation.register_internal_hms_as_federated_catalog()
25+
federation = HiveMetastoreFederation(ws, external_locations, workspace_info, ctx.config)
26+
prompts = MockPrompts({})
27+
federation.create_from_cli(prompts)
2628
workspace_info.current.assert_called_once()

0 commit comments

Comments
 (0)