Skip to content

Commit bba54e5

Browse files
authored
Added TableMapping functionality to table migrate (#752)
1 parent 06c666c commit bba54e5

File tree

9 files changed

+322
-218
lines changed

9 files changed

+322
-218
lines changed

src/databricks/labs/ucx/cli.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ def revert_migrated_tables(schema: str, table: str, *, delete_managed: bool = Fa
141141
warehouse_id = installation.config.warehouse_id
142142
sql_backend = StatementExecutionBackend(ws, warehouse_id)
143143
table_crawler = TablesCrawler(sql_backend, installation.config.inventory_database)
144-
tm = TablesMigrate(table_crawler, ws, sql_backend)
144+
tmp = TableMapping(ws)
145+
tm = TablesMigrate(table_crawler, ws, sql_backend, tmp)
145146
if tm.print_revert_report(delete_managed=delete_managed) and prompts.confirm(
146147
"Would you like to continue?", max_attempts=2
147148
):

src/databricks/labs/ucx/hive_metastore/mapping.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ def initial(cls, workspace_name: str, catalog_name: str, table: Table) -> "Rule"
3737
dst_table=table.name,
3838
)
3939

40+
@property
41+
def as_uc_table_key(self):
42+
return f"{self.catalog_name}.{self.dst_schema}.{self.dst_table}"
43+
44+
@property
45+
def as_hms_table_key(self):
46+
return f"hive_metastore.{self.src_schema}.{self.src_table}"
47+
4048

4149
class TableMapping:
4250
UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip"

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from databricks.labs.ucx.framework.crawlers import SqlBackend
88
from databricks.labs.ucx.framework.parallel import Threads
99
from databricks.labs.ucx.hive_metastore import TablesCrawler
10+
from databricks.labs.ucx.hive_metastore.mapping import Rule, TableMapping
1011
from databricks.labs.ucx.hive_metastore.tables import MigrationCount, Table
1112

1213
logger = logging.getLogger(__name__)
@@ -18,62 +19,66 @@ def __init__(
1819
tc: TablesCrawler,
1920
ws: WorkspaceClient,
2021
backend: SqlBackend,
21-
default_catalog=None,
22-
database_to_catalog_mapping: dict[str, str] | None = None,
22+
tm: TableMapping,
2323
):
2424
self._tc = tc
2525
self._backend = backend
2626
self._ws = ws
27-
self._database_to_catalog_mapping = database_to_catalog_mapping
28-
self._default_catalog = self._init_default_catalog(default_catalog)
27+
self._tm = tm
2928
self._seen_tables: dict[str, str] = {}
3029

31-
@staticmethod
32-
def _init_default_catalog(default_catalog):
33-
if default_catalog:
34-
return default_catalog
35-
else:
36-
return "ucx_default" # TODO : Fetch current workspace name and append it to the default catalog.
37-
3830
def migrate_tables(self):
3931
self._init_seen_tables()
32+
mapping_rules = self._get_mapping_rules()
4033
tasks = []
4134
for table in self._tc.snapshot():
42-
target_catalog = self._default_catalog
43-
if self._database_to_catalog_mapping:
44-
target_catalog = self._database_to_catalog_mapping[table.database]
45-
tasks.append(partial(self._migrate_table, target_catalog, table))
46-
_, errors = Threads.gather("migrate tables", tasks)
47-
if len(errors) > 0:
48-
# TODO: https://github.com/databrickslabs/ucx/issues/406
49-
# TODO: pick first X issues in the summary
50-
msg = f"Detected {len(errors)} errors: {'. '.join(str(e) for e in errors)}"
51-
raise ValueError(msg)
52-
53-
def _migrate_table(self, target_catalog: str, table: Table):
54-
sql = table.uc_create_sql(target_catalog)
55-
logger.debug(f"Migrating table {table.key} to using SQL query: {sql}")
56-
target = f"{target_catalog}.{table.database}.{table.name}".lower()
57-
58-
if self._table_already_upgraded(target):
59-
logger.info(f"Table {table.key} already upgraded to {self._seen_tables[target]}")
60-
elif table.object_type == "MANAGED":
61-
self._backend.execute(sql)
62-
self._backend.execute(table.sql_alter_to(target_catalog))
63-
self._backend.execute(table.sql_alter_from(target_catalog))
64-
self._seen_tables[target] = table.key
65-
elif table.object_type == "EXTERNAL":
66-
result = next(self._backend.fetch(sql))
67-
if result.status_code != "SUCCESS":
68-
raise ValueError(result.description)
69-
self._backend.execute(table.sql_alter_to(target_catalog))
70-
self._backend.execute(table.sql_alter_from(target_catalog))
71-
self._seen_tables[target] = table.key
72-
else:
73-
msg = f"Table {table.key} is a {table.object_type} and is not supported for migration yet"
74-
raise ValueError(msg)
35+
rule = mapping_rules.get(table.key)
36+
if not rule:
37+
logger.info(f"Skipping table {table.key} table doesn't exist in the mapping table.")
38+
continue
39+
tasks.append(partial(self._migrate_table, table, rule))
40+
Threads.strict("migrate tables", tasks)
41+
42+
def _migrate_table(self, src_table: Table, rule: Rule):
43+
if self._table_already_upgraded(rule.as_uc_table_key):
44+
logger.info(f"Table {src_table.key} already upgraded to {rule.as_uc_table_key}")
45+
return True
46+
if src_table.object_type == "MANAGED":
47+
return self._migrate_managed_table(src_table, rule)
48+
if src_table.kind == "VIEW":
49+
return self._migrate_view(src_table, rule)
50+
if src_table.object_type == "EXTERNAL":
51+
return self._migrate_external_table(src_table, rule)
52+
return True
53+
54+
def _migrate_external_table(self, src_table: Table, rule: Rule):
55+
target_table_key = rule.as_uc_table_key
56+
table_migrate_sql = src_table.uc_create_sql(target_table_key)
57+
logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}")
58+
self._backend.execute(table_migrate_sql)
59+
return True
60+
61+
def _migrate_managed_table(self, src_table: Table, rule: Rule):
62+
target_table_key = rule.as_uc_table_key
63+
table_migrate_sql = src_table.uc_create_sql(target_table_key)
64+
logger.debug(f"Migrating managed table {src_table.key} to using SQL query: {table_migrate_sql}")
65+
self._backend.execute(table_migrate_sql)
66+
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
67+
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key))
68+
return True
69+
70+
def _migrate_view(self, src_table: Table, rule: Rule):
71+
target_table_key = rule.as_uc_table_key
72+
table_migrate_sql = src_table.uc_create_sql(target_table_key)
73+
logger.debug(f"Migrating view {src_table.key} to using SQL query: {table_migrate_sql}")
74+
self._backend.execute(table_migrate_sql)
75+
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
76+
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key))
7577
return True
7678

79+
msg = f"Table {src_table.key} is a {src_table.object_type} and is not supported for migration yet"
80+
logger.info(msg)
81+
7782
def _init_seen_tables(self):
7883
for catalog in self._ws.catalogs.list():
7984
for schema in self._ws.schemas.list(catalog_name=catalog.name):
@@ -90,8 +95,6 @@ def _get_tables_to_revert(self, schema: str | None = None, table: str | None = N
9095
upgraded_tables = []
9196
if table and not schema:
9297
logger.error("Cannot accept 'Table' parameter without 'Schema' parameter")
93-
if len(self._seen_tables) == 0:
94-
self._init_seen_tables()
9598

9699
for cur_table in self._tc.snapshot():
97100
if schema and cur_table.database != schema:
@@ -105,6 +108,7 @@ def _get_tables_to_revert(self, schema: str | None = None, table: str | None = N
105108
def revert_migrated_tables(
106109
self, schema: str | None = None, table: str | None = None, *, delete_managed: bool = False
107110
):
111+
self._init_seen_tables()
108112
upgraded_tables = self._get_tables_to_revert(schema=schema, table=table)
109113
# reverses the _seen_tables dictionary to key by the source table
110114
reverse_seen = {v: k for (k, v) in self._seen_tables.items()}
@@ -123,10 +127,11 @@ def _revert_migrated_table(self, table: Table, target_table_key: str):
123127
logger.info(
124128
f"Reverting {table.object_type} table {table.database}.{table.name} upgraded_to {table.upgraded_to}"
125129
)
126-
self._backend.execute(table.sql_unset_upgraded_to("hive_metastore"))
130+
self._backend.execute(table.sql_unset_upgraded_to())
127131
self._backend.execute(f"DROP {table.kind} IF EXISTS {target_table_key}")
128132

129133
def _get_revert_count(self, schema: str | None = None, table: str | None = None) -> list[MigrationCount]:
134+
self._init_seen_tables()
130135
upgraded_tables = self._get_tables_to_revert(schema=schema, table=table)
131136

132137
table_by_database = defaultdict(list)
@@ -183,3 +188,9 @@ def print_revert_report(self, *, delete_managed: bool) -> bool | None:
183188
print("Migrated Manged Tables (targets) will be left intact.")
184189
print("To revert and delete Migrated Tables, add --delete_managed true flag to the command.")
185190
return True
191+
192+
def _get_mapping_rules(self) -> dict[str, Rule]:
193+
mapping_rules: dict[str, Rule] = {}
194+
for rule in self._tm.load():
195+
mapping_rules[rule.as_hms_table_key] = rule
196+
return mapping_rules

src/databricks/labs/ucx/hive_metastore/tables.py

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -52,43 +52,22 @@ def key(self) -> str:
5252
def kind(self) -> str:
5353
return "VIEW" if self.view_text is not None else "TABLE"
5454

55-
def _sql_external(self, catalog):
56-
return f"SYNC TABLE {catalog}.{self.database}.{self.name} FROM {self.key};"
57-
58-
def _sql_managed(self, catalog):
59-
if not self.is_delta:
60-
msg = f"{self.key} is not DELTA: {self.table_format}"
61-
raise ValueError(msg)
62-
return f"CREATE TABLE IF NOT EXISTS {catalog}.{self.database}.{self.name} DEEP CLONE {self.key};"
63-
64-
def _sql_view(self, catalog):
65-
return f"CREATE VIEW IF NOT EXISTS {catalog}.{self.database}.{self.name} AS {self.view_text};"
66-
67-
def uc_create_sql(self, catalog):
55+
def uc_create_sql(self, target_table_key):
6856
if self.kind == "VIEW":
69-
return self._sql_view(catalog)
57+
return self._sql_migrate_view(target_table_key)
7058
elif self.object_type == "EXTERNAL":
71-
return self._sql_external(catalog)
59+
return self._sql_migrate_external(target_table_key)
7260
else:
73-
return self._sql_managed(catalog)
74-
75-
def sql_alter_to(self, catalog):
76-
return (
77-
f"ALTER {self.kind} {self.key} SET"
78-
f" TBLPROPERTIES ('upgraded_to' = '{catalog}.{self.database}.{self.name}');"
79-
)
80-
81-
def sql_alter_from(self, catalog):
82-
return (
83-
f"ALTER {self.kind} {catalog}.{self.database}.{self.name} SET"
84-
f" TBLPROPERTIES ('upgraded_from' = '{self.key}');"
85-
)
86-
87-
def sql_unset_upgraded_to(self, catalog):
88-
return (
89-
f"ALTER {self.kind} `{catalog}`.`{self.database}`.`{self.name}` "
90-
f"UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"
91-
)
61+
return self._sql_migrate_managed(target_table_key)
62+
63+
def sql_alter_to(self, target_table_key):
64+
return f"ALTER {self.kind} {self.key} SET TBLPROPERTIES ('upgraded_to' = '{target_table_key}');"
65+
66+
def sql_alter_from(self, target_table_key):
67+
return f"ALTER {self.kind} {target_table_key} SET TBLPROPERTIES ('upgraded_from' = '{self.key}');"
68+
69+
def sql_unset_upgraded_to(self):
70+
return f"ALTER {self.kind} {self.key} UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"
9271

9372
def is_dbfs_root(self) -> bool:
9473
if not self.location:
@@ -101,6 +80,18 @@ def is_dbfs_root(self) -> bool:
10180
return True
10281
return False
10382

83+
def _sql_migrate_external(self, target_table_key):
84+
return f"SYNC TABLE {target_table_key} FROM {self.key};"
85+
86+
def _sql_migrate_managed(self, target_table_key):
87+
if not self.is_delta:
88+
msg = f"{self.key} is not DELTA: {self.table_format}"
89+
raise ValueError(msg)
90+
return f"CREATE TABLE IF NOT EXISTS {target_table_key} DEEP CLONE {self.key};"
91+
92+
def _sql_migrate_view(self, target_table_key):
93+
return f"CREATE VIEW IF NOT EXISTS {target_table_key} AS {self.view_text};"
94+
10495

10596
@dataclass
10697
class TableError:

tests/integration/conftest.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from databricks.labs.ucx.framework.crawlers import SqlBackend
1616
from databricks.labs.ucx.hive_metastore import TablesCrawler
17+
from databricks.labs.ucx.hive_metastore.mapping import Rule
1718
from databricks.labs.ucx.hive_metastore.tables import Table
1819
from databricks.labs.ucx.mixins.fixtures import * # noqa: F403
1920
from databricks.labs.ucx.workspace_access.groups import MigratedGroup
@@ -134,3 +135,11 @@ def __init__(self, sql_backend: SqlBackend, schema: str, tables: list[TableInfo]
134135

135136
def snapshot(self) -> list[Table]:
136137
return self._tables
138+
139+
140+
class StaticTableMapping:
141+
def __init__(self, rules: list[Rule] | None = None):
142+
self._rules = rules
143+
144+
def load(self):
145+
return self._rules

0 commit comments

Comments
 (0)