Skip to content

Commit ef937c4

Browse files
authored
Move TablesMigrate to a separate module (#747)
1 parent 2552cf7 commit ef937c4

File tree

5 files changed

+188
-179
lines changed

5 files changed

+188
-179
lines changed

src/databricks/labs/ucx/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from databricks.labs.ucx.framework.tui import Prompts
1212
from databricks.labs.ucx.hive_metastore import ExternalLocations, TablesCrawler
1313
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
14-
from databricks.labs.ucx.hive_metastore.tables import TablesMigrate
14+
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate
1515
from databricks.labs.ucx.install import WorkspaceInstaller
1616
from databricks.labs.ucx.installer import InstallationManager
1717

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import logging
2+
from collections import defaultdict
3+
from functools import partial
4+
5+
from databricks.sdk import WorkspaceClient
6+
7+
from databricks.labs.ucx.framework.crawlers import SqlBackend
8+
from databricks.labs.ucx.framework.parallel import Threads
9+
from databricks.labs.ucx.hive_metastore import TablesCrawler
10+
from databricks.labs.ucx.hive_metastore.tables import MigrationCount, Table
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class TablesMigrate:
16+
def __init__(
17+
self,
18+
tc: TablesCrawler,
19+
ws: WorkspaceClient,
20+
backend: SqlBackend,
21+
default_catalog=None,
22+
database_to_catalog_mapping: dict[str, str] | None = None,
23+
):
24+
self._tc = tc
25+
self._backend = backend
26+
self._ws = ws
27+
self._database_to_catalog_mapping = database_to_catalog_mapping
28+
self._default_catalog = self._init_default_catalog(default_catalog)
29+
self._seen_tables: dict[str, str] = {}
30+
31+
@staticmethod
32+
def _init_default_catalog(default_catalog):
33+
if default_catalog:
34+
return default_catalog
35+
else:
36+
return "ucx_default" # TODO : Fetch current workspace name and append it to the default catalog.
37+
38+
def migrate_tables(self):
39+
self._init_seen_tables()
40+
tasks = []
41+
for table in self._tc.snapshot():
42+
target_catalog = self._default_catalog
43+
if self._database_to_catalog_mapping:
44+
target_catalog = self._database_to_catalog_mapping[table.database]
45+
tasks.append(partial(self._migrate_table, target_catalog, table))
46+
_, errors = Threads.gather("migrate tables", tasks)
47+
if len(errors) > 0:
48+
# TODO: https://github.com/databrickslabs/ucx/issues/406
49+
# TODO: pick first X issues in the summary
50+
msg = f"Detected {len(errors)} errors: {'. '.join(str(e) for e in errors)}"
51+
raise ValueError(msg)
52+
53+
def _migrate_table(self, target_catalog: str, table: Table):
54+
sql = table.uc_create_sql(target_catalog)
55+
logger.debug(f"Migrating table {table.key} to using SQL query: {sql}")
56+
target = f"{target_catalog}.{table.database}.{table.name}".lower()
57+
58+
if self._table_already_upgraded(target):
59+
logger.info(f"Table {table.key} already upgraded to {self._seen_tables[target]}")
60+
elif table.object_type == "MANAGED":
61+
self._backend.execute(sql)
62+
self._backend.execute(table.sql_alter_to(target_catalog))
63+
self._backend.execute(table.sql_alter_from(target_catalog))
64+
self._seen_tables[target] = table.key
65+
elif table.object_type == "EXTERNAL":
66+
result = next(self._backend.fetch(sql))
67+
if result.status_code != "SUCCESS":
68+
raise ValueError(result.description)
69+
self._backend.execute(table.sql_alter_to(target_catalog))
70+
self._backend.execute(table.sql_alter_from(target_catalog))
71+
self._seen_tables[target] = table.key
72+
else:
73+
msg = f"Table {table.key} is a {table.object_type} and is not supported for migration yet"
74+
raise ValueError(msg)
75+
return True
76+
77+
def _init_seen_tables(self):
78+
for catalog in self._ws.catalogs.list():
79+
for schema in self._ws.schemas.list(catalog_name=catalog.name):
80+
for table in self._ws.tables.list(catalog_name=catalog.name, schema_name=schema.name):
81+
if table.properties is not None and "upgraded_from" in table.properties:
82+
self._seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower()
83+
84+
def _table_already_upgraded(self, target) -> bool:
85+
return target in self._seen_tables
86+
87+
def _get_tables_to_revert(self, schema: str | None = None, table: str | None = None) -> list[Table]:
88+
schema = schema.lower() if schema else None
89+
table = table.lower() if table else None
90+
upgraded_tables = []
91+
if table and not schema:
92+
logger.error("Cannot accept 'Table' parameter without 'Schema' parameter")
93+
if len(self._seen_tables) == 0:
94+
self._init_seen_tables()
95+
96+
for cur_table in self._tc.snapshot():
97+
if schema and cur_table.database != schema:
98+
continue
99+
if table and cur_table.name != table:
100+
continue
101+
if cur_table.key in self._seen_tables.values():
102+
upgraded_tables.append(cur_table)
103+
return upgraded_tables
104+
105+
def revert_migrated_tables(
106+
self, schema: str | None = None, table: str | None = None, *, delete_managed: bool = False
107+
):
108+
upgraded_tables = self._get_tables_to_revert(schema=schema, table=table)
109+
# reverses the _seen_tables dictionary to key by the source table
110+
reverse_seen = {v: k for (k, v) in self._seen_tables.items()}
111+
tasks = []
112+
for upgraded_table in upgraded_tables:
113+
if upgraded_table.kind == "VIEW" or upgraded_table.object_type == "EXTERNAL" or delete_managed:
114+
tasks.append(partial(self._revert_migrated_table, upgraded_table, reverse_seen[upgraded_table.key]))
115+
continue
116+
logger.info(
117+
f"Skipping {upgraded_table.object_type} Table {upgraded_table.database}.{upgraded_table.name} "
118+
f"upgraded_to {upgraded_table.upgraded_to}"
119+
)
120+
Threads.strict("revert migrated tables", tasks)
121+
122+
def _revert_migrated_table(self, table: Table, target_table_key: str):
123+
logger.info(
124+
f"Reverting {table.object_type} table {table.database}.{table.name} upgraded_to {table.upgraded_to}"
125+
)
126+
self._backend.execute(table.sql_unset_upgraded_to("hive_metastore"))
127+
self._backend.execute(f"DROP {table.kind} IF EXISTS {target_table_key}")
128+
129+
def _get_revert_count(self, schema: str | None = None, table: str | None = None) -> list[MigrationCount]:
130+
upgraded_tables = self._get_tables_to_revert(schema=schema, table=table)
131+
132+
table_by_database = defaultdict(list)
133+
for cur_table in upgraded_tables:
134+
table_by_database[cur_table.database].append(cur_table)
135+
136+
migration_list = []
137+
for cur_database in table_by_database.keys():
138+
external_tables = 0
139+
managed_tables = 0
140+
views = 0
141+
for current_table in table_by_database[cur_database]:
142+
if current_table.upgraded_to is not None:
143+
if current_table.kind == "VIEW":
144+
views += 1
145+
continue
146+
if current_table.object_type == "EXTERNAL":
147+
external_tables += 1
148+
continue
149+
if current_table.object_type == "MANAGED":
150+
managed_tables += 1
151+
continue
152+
migration_list.append(
153+
MigrationCount(
154+
database=cur_database, managed_tables=managed_tables, external_tables=external_tables, views=views
155+
)
156+
)
157+
return migration_list
158+
159+
def is_upgraded(self, schema: str, table: str) -> bool:
160+
result = self._backend.fetch(f"SHOW TBLPROPERTIES `{schema}`.`{table}`")
161+
for value in result:
162+
if value["key"] == "upgraded_to":
163+
logger.info(f"{schema}.{table} is set as upgraded")
164+
return True
165+
logger.info(f"{schema}.{table} is set as not upgraded")
166+
return False
167+
168+
def print_revert_report(self, *, delete_managed: bool) -> bool | None:
169+
migrated_count = self._get_revert_count()
170+
if not migrated_count:
171+
logger.info("No migrated tables were found.")
172+
return False
173+
print("The following is the count of migrated tables and views found in scope:")
174+
print("Database | External Tables | Managed Table | Views |")
175+
print("=" * 88)
176+
for count in migrated_count:
177+
print(f"{count.database:<30}| {count.external_tables:16} | {count.managed_tables:16} | {count.views:16} |")
178+
print("=" * 88)
179+
print("Migrated External Tables and Views (targets) will be deleted")
180+
if delete_managed:
181+
print("Migrated Manged Tables (targets) will be deleted")
182+
else:
183+
print("Migrated Manged Tables (targets) will be left intact.")
184+
print("To revert and delete Migrated Tables, add --delete_managed true flag to the command.")
185+
return True

src/databricks/labs/ucx/hive_metastore/tables.py

Lines changed: 0 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
import logging
22
import re
33
import typing
4-
from collections import defaultdict
54
from collections.abc import Iterable, Iterator
65
from dataclasses import dataclass
76
from functools import partial
87

9-
from databricks.sdk import WorkspaceClient
10-
118
from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend
129
from databricks.labs.ucx.framework.parallel import Threads
1310
from databricks.labs.ucx.mixins.sql import Row
@@ -218,176 +215,3 @@ def _describe(self, catalog: str, database: str, table: str) -> Table | None:
218215
# TODO: https://github.com/databrickslabs/ucx/issues/406
219216
logger.error(f"Couldn't fetch information for table {full_name} : {e}")
220217
return None
221-
222-
223-
class TablesMigrate:
224-
def __init__(
225-
self,
226-
tc: TablesCrawler,
227-
ws: WorkspaceClient,
228-
backend: SqlBackend,
229-
default_catalog=None,
230-
database_to_catalog_mapping: dict[str, str] | None = None,
231-
):
232-
self._tc = tc
233-
self._backend = backend
234-
self._ws = ws
235-
self._database_to_catalog_mapping = database_to_catalog_mapping
236-
self._default_catalog = self._init_default_catalog(default_catalog)
237-
self._seen_tables: dict[str, str] = {}
238-
239-
@staticmethod
240-
def _init_default_catalog(default_catalog):
241-
if default_catalog:
242-
return default_catalog
243-
else:
244-
return "ucx_default" # TODO : Fetch current workspace name and append it to the default catalog.
245-
246-
def migrate_tables(self):
247-
self._init_seen_tables()
248-
tasks = []
249-
for table in self._tc.snapshot():
250-
target_catalog = self._default_catalog
251-
if self._database_to_catalog_mapping:
252-
target_catalog = self._database_to_catalog_mapping[table.database]
253-
tasks.append(partial(self._migrate_table, target_catalog, table))
254-
_, errors = Threads.gather("migrate tables", tasks)
255-
if len(errors) > 0:
256-
# TODO: https://github.com/databrickslabs/ucx/issues/406
257-
# TODO: pick first X issues in the summary
258-
msg = f"Detected {len(errors)} errors: {'. '.join(str(e) for e in errors)}"
259-
raise ValueError(msg)
260-
261-
def _migrate_table(self, target_catalog: str, table: Table):
262-
sql = table.uc_create_sql(target_catalog)
263-
logger.debug(f"Migrating table {table.key} to using SQL query: {sql}")
264-
target = f"{target_catalog}.{table.database}.{table.name}".lower()
265-
266-
if self._table_already_upgraded(target):
267-
logger.info(f"Table {table.key} already upgraded to {self._seen_tables[target]}")
268-
elif table.object_type == "MANAGED":
269-
self._backend.execute(sql)
270-
self._backend.execute(table.sql_alter_to(target_catalog))
271-
self._backend.execute(table.sql_alter_from(target_catalog))
272-
self._seen_tables[target] = table.key
273-
elif table.object_type == "EXTERNAL":
274-
result = next(self._backend.fetch(sql))
275-
if result.status_code != "SUCCESS":
276-
raise ValueError(result.description)
277-
self._backend.execute(table.sql_alter_to(target_catalog))
278-
self._backend.execute(table.sql_alter_from(target_catalog))
279-
self._seen_tables[target] = table.key
280-
else:
281-
msg = f"Table {table.key} is a {table.object_type} and is not supported for migration yet"
282-
raise ValueError(msg)
283-
return True
284-
285-
def _init_seen_tables(self):
286-
for catalog in self._ws.catalogs.list():
287-
for schema in self._ws.schemas.list(catalog_name=catalog.name):
288-
for table in self._ws.tables.list(catalog_name=catalog.name, schema_name=schema.name):
289-
if table.properties is not None and "upgraded_from" in table.properties:
290-
self._seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower()
291-
292-
def _table_already_upgraded(self, target) -> bool:
293-
return target in self._seen_tables
294-
295-
def _get_tables_to_revert(self, schema: str | None = None, table: str | None = None) -> list[Table]:
296-
schema = schema.lower() if schema else None
297-
table = table.lower() if table else None
298-
upgraded_tables = []
299-
if table and not schema:
300-
logger.error("Cannot accept 'Table' parameter without 'Schema' parameter")
301-
if len(self._seen_tables) == 0:
302-
self._init_seen_tables()
303-
304-
for cur_table in self._tc.snapshot():
305-
if schema and cur_table.database != schema:
306-
continue
307-
if table and cur_table.name != table:
308-
continue
309-
if cur_table.key in self._seen_tables.values():
310-
upgraded_tables.append(cur_table)
311-
return upgraded_tables
312-
313-
def revert_migrated_tables(
314-
self, schema: str | None = None, table: str | None = None, *, delete_managed: bool = False
315-
):
316-
upgraded_tables = self._get_tables_to_revert(schema=schema, table=table)
317-
# reverses the _seen_tables dictionary to key by the source table
318-
reverse_seen = {v: k for (k, v) in self._seen_tables.items()}
319-
tasks = []
320-
for upgraded_table in upgraded_tables:
321-
if upgraded_table.kind == "VIEW" or upgraded_table.object_type == "EXTERNAL" or delete_managed:
322-
tasks.append(partial(self._revert_migrated_table, upgraded_table, reverse_seen[upgraded_table.key]))
323-
continue
324-
logger.info(
325-
f"Skipping {upgraded_table.object_type} Table {upgraded_table.database}.{upgraded_table.name} "
326-
f"upgraded_to {upgraded_table.upgraded_to}"
327-
)
328-
Threads.strict("revert migrated tables", tasks)
329-
330-
def _revert_migrated_table(self, table: Table, target_table_key: str):
331-
logger.info(
332-
f"Reverting {table.object_type} table {table.database}.{table.name} upgraded_to {table.upgraded_to}"
333-
)
334-
self._backend.execute(table.sql_unset_upgraded_to("hive_metastore"))
335-
self._backend.execute(f"DROP {table.kind} IF EXISTS {target_table_key}")
336-
337-
def _get_revert_count(self, schema: str | None = None, table: str | None = None) -> list[MigrationCount]:
338-
upgraded_tables = self._get_tables_to_revert(schema=schema, table=table)
339-
340-
table_by_database = defaultdict(list)
341-
for cur_table in upgraded_tables:
342-
table_by_database[cur_table.database].append(cur_table)
343-
344-
migration_list = []
345-
for cur_database in table_by_database.keys():
346-
external_tables = 0
347-
managed_tables = 0
348-
views = 0
349-
for current_table in table_by_database[cur_database]:
350-
if current_table.upgraded_to is not None:
351-
if current_table.kind == "VIEW":
352-
views += 1
353-
continue
354-
if current_table.object_type == "EXTERNAL":
355-
external_tables += 1
356-
continue
357-
if current_table.object_type == "MANAGED":
358-
managed_tables += 1
359-
continue
360-
migration_list.append(
361-
MigrationCount(
362-
database=cur_database, managed_tables=managed_tables, external_tables=external_tables, views=views
363-
)
364-
)
365-
return migration_list
366-
367-
def is_upgraded(self, schema: str, table: str) -> bool:
368-
result = self._backend.fetch(f"SHOW TBLPROPERTIES `{schema}`.`{table}`")
369-
for value in result:
370-
if value["key"] == "upgraded_to":
371-
logger.info(f"{schema}.{table} is set as upgraded")
372-
return True
373-
logger.info(f"{schema}.{table} is set as not upgraded")
374-
return False
375-
376-
def print_revert_report(self, *, delete_managed: bool) -> bool | None:
377-
migrated_count = self._get_revert_count()
378-
if not migrated_count:
379-
logger.info("No migrated tables were found.")
380-
return False
381-
print("The following is the count of migrated tables and views found in scope:")
382-
print("Database | External Tables | Managed Table | Views |")
383-
print("=" * 88)
384-
for count in migrated_count:
385-
print(f"{count.database:<30}| {count.external_tables:16} | {count.managed_tables:16} | {count.views:16} |")
386-
print("=" * 88)
387-
print("Migrated External Tables and Views (targets) will be deleted")
388-
if delete_managed:
389-
print("Migrated Manged Tables (targets) will be deleted")
390-
else:
391-
print("Migrated Manged Tables (targets) will be left intact.")
392-
print("To revert and delete Migrated Tables, add --delete_managed true flag to the command.")
393-
return True

tests/integration/hive_metastore/test_migrate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from databricks.sdk.errors import NotFound
66
from databricks.sdk.retries import retried
77

8-
from databricks.labs.ucx.hive_metastore.tables import TablesMigrate
8+
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate
99

1010
from ..conftest import StaticTablesCrawler
1111

0 commit comments

Comments
 (0)