1
1
import collections
2
2
import logging
3
3
from dataclasses import replace
4
- import fnmatch
5
4
from pathlib import PurePath
6
5
7
6
from databricks .labs .blueprint .tui import Prompts
@@ -30,7 +29,7 @@ def __init__(
30
29
):
31
30
self ._ws = ws
32
31
self ._table_mapping = table_mapping
33
- self ._external_locations = self ._ws .external_locations .list ()
32
+ self ._external_locations = list ( self ._ws .external_locations .list () )
34
33
self ._principal_grants = principal_grants
35
34
self ._backend = sql_backend
36
35
self ._hive_grants_crawler = grants_crawler
@@ -45,32 +44,19 @@ def create_ucx_catalog(self, prompts: Prompts, *, properties: dict[str, str] | N
45
44
properties : (dict[str, str] | None), default None
46
45
The properties to pass to the catalog. If None, no properties are passed.
47
46
"""
48
- try :
49
- self ._create_catalog_validate (self ._ucx_catalog , prompts , properties = properties )
50
- except BadRequest as e :
51
- if "already exists" in str (e ):
52
- logger .warning (f"Catalog '{ self ._ucx_catalog } ' already exists. Skipping." )
53
- return
54
- raise
47
+ self ._create_catalog_validate (self ._ucx_catalog , prompts , properties = properties )
55
48
56
49
def create_all_catalogs_schemas (self , prompts : Prompts ) -> None :
57
50
candidate_catalogs , candidate_schemas = self ._get_missing_catalogs_schemas ()
58
51
for candidate_catalog in candidate_catalogs :
59
- try :
60
- self ._create_catalog_validate (candidate_catalog , prompts , properties = None )
61
- except BadRequest as e :
62
- if "already exists" in str (e ):
63
- logger .warning (f"Catalog '{ candidate_catalog } ' already exists. Skipping." )
64
- continue
52
+ self ._create_catalog_validate (candidate_catalog , prompts , properties = None )
65
53
for candidate_catalog , schemas in candidate_schemas .items ():
66
54
for candidate_schema in schemas :
67
55
try :
68
56
self ._create_schema (candidate_catalog , candidate_schema )
69
57
except BadRequest as e :
70
58
if "already exists" in str (e ):
71
- logger .warning (
72
- f"Schema '{ candidate_schema } ' in catalog '{ candidate_catalog } ' already exists. Skipping."
73
- )
59
+ logger .warning (f"Skipping already existing schema: { candidate_catalog } .{ candidate_schema } " )
74
60
continue
75
61
self ._apply_from_legacy_table_acls ()
76
62
self ._update_principal_acl ()
@@ -141,19 +127,28 @@ def _get_database_source_target_mapping(self) -> dict[str, list[SchemaInfo]]:
141
127
src_trg_schema_mapping [table_mapping .src_schema ].append (schema )
142
128
return src_trg_schema_mapping
143
129
144
- def _create_catalog_validate (self , catalog : str , prompts : Prompts , * , properties : dict [str , str ] | None ) -> None :
145
- logger .info (f"Validating UC catalog: { catalog } " )
130
+ def _create_catalog_validate (
131
+ self , catalog_name : str , prompts : Prompts , * , properties : dict [str , str ] | None
132
+ ) -> None :
133
+ try :
134
+ catalog = self ._ws .catalogs .get (catalog_name )
135
+ except NotFound :
136
+ catalog = None
137
+ if catalog :
138
+ logger .warning (f"Skipping already existing catalog: { catalog_name } " )
139
+ return
140
+ logger .info (f"Validating UC catalog: { catalog_name } " )
146
141
attempts = 3
147
142
while True :
148
143
catalog_storage = prompts .question (
149
- f"Please provide storage location url for catalog: { catalog } " , default = "metastore"
144
+ f"Please provide storage location url for catalog: { catalog_name } " , default = "metastore"
150
145
)
151
146
if self ._validate_location (catalog_storage ):
152
147
break
153
148
attempts -= 1
154
149
if attempts == 0 :
155
- raise NotFound (f"Failed to validate location for { catalog } catalog " )
156
- self ._create_catalog (catalog , catalog_storage , properties = properties )
150
+ raise NotFound (f"Failed to validate location for catalog: { catalog_name } " )
151
+ self ._create_catalog (catalog_name , catalog_storage , properties = properties )
157
152
158
153
def _list_existing (self ) -> tuple [set [str ], dict [str , set [str ]]]:
159
154
"""generate a list of existing UC catalogs and schema."""
@@ -203,19 +198,18 @@ def _get_missing_catalogs_schemas(self) -> tuple[set[str], dict[str, set[str]]]:
203
198
target_schemas [catalog ] = target_schemas [catalog ] - schemas
204
199
return target_catalogs , target_schemas
205
200
206
- def _validate_location (self , location : str ):
201
+ def _validate_location (self , location : str ) -> bool :
207
202
if location == "metastore" :
208
203
return True
209
204
try :
210
205
PurePath (location )
211
206
except ValueError :
212
- logger .error (f"Invalid location path { location } " )
207
+ logger .error (f"Invalid location path: { location } " )
213
208
return False
214
209
for external_location in self ._external_locations :
215
- if location == external_location .url :
216
- return True
217
- if external_location .url is not None and fnmatch .fnmatch (location , external_location .url + '*' ):
210
+ if external_location .url is not None and location .startswith (external_location .url ):
218
211
return True
212
+ logger .warning (f"No matching external location found for: { location } " )
219
213
return False
220
214
221
215
def _create_catalog (self , catalog : str , catalog_storage : str , * , properties : dict [str , str ] | None ) -> None :
0 commit comments