Skip to content

Commit db580a9

Browse files
authored
Added functionality to determine migration method based on DBFS Root (#759)
1 parent ff97db8 commit db580a9

File tree

7 files changed

+160
-54
lines changed

7 files changed

+160
-54
lines changed

src/databricks/labs/ucx/hive_metastore/mapping.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ def get_tables_to_migrate(self, tables_crawler: TablesCrawler):
134134
if rule.src_schema not in databases_in_scope:
135135
logger.info(f"Table {rule.as_hms_table_key} is in a database that was marked to be skipped")
136136
continue
137+
if crawled_tables_keys[rule.as_hms_table_key].is_databricks_dataset:
138+
logger.info(f"Table {rule.as_hms_table_key} is a db demo dataset and will not be upgraded")
139+
continue
137140
tasks.append(
138141
partial(self._get_table_in_scope_task, TableToMigrate(crawled_tables_keys[rule.as_hms_table_key], rule))
139142
)

src/databricks/labs/ucx/hive_metastore/table_migrate.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,25 @@ def _migrate_table(self, src_table: Table, rule: Rule):
3939
if self._table_already_upgraded(rule.as_uc_table_key):
4040
logger.info(f"Table {src_table.key} already upgraded to {rule.as_uc_table_key}")
4141
return True
42-
if src_table.object_type == "MANAGED":
43-
return self._migrate_managed_table(src_table, rule)
42+
if src_table.kind == "TABLE" and src_table.table_format == "DELTA" and src_table.is_dbfs_root:
43+
return self._migrate_dbfs_root_table(src_table, rule)
44+
if src_table.kind == "TABLE" and src_table.is_format_supported_for_sync:
45+
return self._migrate_external_table(src_table, rule)
4446
if src_table.kind == "VIEW":
4547
return self._migrate_view(src_table, rule)
46-
if src_table.object_type == "EXTERNAL":
47-
return self._migrate_external_table(src_table, rule)
48+
logger.info(f"Table {src_table.key} is not supported for migration")
4849
return True
4950

5051
def _migrate_external_table(self, src_table: Table, rule: Rule):
5152
target_table_key = rule.as_uc_table_key
52-
table_migrate_sql = src_table.uc_create_sql(target_table_key)
53+
table_migrate_sql = src_table.sql_migrate_external(target_table_key)
5354
logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}")
5455
self._backend.execute(table_migrate_sql)
5556
return True
5657

57-
def _migrate_managed_table(self, src_table: Table, rule: Rule):
58+
def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule):
5859
target_table_key = rule.as_uc_table_key
59-
table_migrate_sql = src_table.uc_create_sql(target_table_key)
60+
table_migrate_sql = src_table.sql_migrate_dbfs(target_table_key)
6061
logger.debug(f"Migrating managed table {src_table.key} to using SQL query: {table_migrate_sql}")
6162
self._backend.execute(table_migrate_sql)
6263
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
@@ -65,16 +66,13 @@ def _migrate_managed_table(self, src_table: Table, rule: Rule):
6566

6667
def _migrate_view(self, src_table: Table, rule: Rule):
6768
target_table_key = rule.as_uc_table_key
68-
table_migrate_sql = src_table.uc_create_sql(target_table_key)
69+
table_migrate_sql = src_table.sql_migrate_view(target_table_key)
6970
logger.debug(f"Migrating view {src_table.key} to using SQL query: {table_migrate_sql}")
7071
self._backend.execute(table_migrate_sql)
7172
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
7273
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key))
7374
return True
7475

75-
msg = f"Table {src_table.key} is a {src_table.object_type} and is not supported for migration yet"
76-
logger.info(msg)
77-
7876
def _init_seen_tables(self):
7977
for catalog in self._ws.catalogs.list():
8078
for schema in self._ws.schemas.list(catalog_name=catalog.name):

src/databricks/labs/ucx/hive_metastore/table_size.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _crawl(self) -> Iterable[TableSize]:
4040
for table in self._tables_crawler.snapshot():
4141
if not table.kind == "TABLE":
4242
continue
43-
if not table.is_dbfs_root():
43+
if not table.is_dbfs_root:
4444
continue
4545
size_in_bytes = self.get_table_size(table.key)
4646
yield TableSize(

src/databricks/labs/ucx/hive_metastore/tables.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ class Table:
3535
DBFS_ROOT_PREFIX_EXCEPTIONS: typing.ClassVar[list[str]] = [
3636
"/dbfs/mnt",
3737
"dbfs:/mnt",
38+
]
39+
40+
DBFS_DATABRICKS_DATASETS_PREFIXES: typing.ClassVar[list[str]] = [
3841
"/dbfs/databricks-datasets",
3942
"dbfs:/databricks-datasets",
4043
]
@@ -53,14 +56,6 @@ def key(self) -> str:
5356
def kind(self) -> str:
5457
return "VIEW" if self.view_text is not None else "TABLE"
5558

56-
def uc_create_sql(self, target_table_key):
57-
if self.kind == "VIEW":
58-
return self._sql_migrate_view(target_table_key)
59-
elif self.object_type == "EXTERNAL":
60-
return self._sql_migrate_external(target_table_key)
61-
else:
62-
return self._sql_migrate_managed(target_table_key)
63-
6459
def sql_alter_to(self, target_table_key):
6560
return f"ALTER {self.kind} {self.key} SET TBLPROPERTIES ('upgraded_to' = '{target_table_key}');"
6661

@@ -70,27 +65,46 @@ def sql_alter_from(self, target_table_key):
7065
def sql_unset_upgraded_to(self):
7166
return f"ALTER {self.kind} {self.key} UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"
7267

68+
@property
7369
def is_dbfs_root(self) -> bool:
7470
if not self.location:
7571
return False
76-
for exception in self.DBFS_ROOT_PREFIX_EXCEPTIONS:
77-
if self.location.startswith(exception):
78-
return False
7972
for prefix in self.DBFS_ROOT_PREFIXES:
8073
if self.location.startswith(prefix):
74+
for exception in self.DBFS_ROOT_PREFIX_EXCEPTIONS:
75+
if self.location.startswith(exception):
76+
return False
77+
for db_datasets in self.DBFS_DATABRICKS_DATASETS_PREFIXES:
78+
if self.location.startswith(db_datasets):
79+
return False
80+
return True
81+
return False
82+
83+
@property
84+
def is_format_supported_for_sync(self) -> bool:
85+
if self.table_format is None:
86+
return False
87+
return self.table_format.upper() in ("DELTA", "PARQUET", "CSV", "JSON", "ORC", "TEXT")
88+
89+
@property
90+
def is_databricks_dataset(self) -> bool:
91+
if not self.location:
92+
return False
93+
for db_datasets in self.DBFS_DATABRICKS_DATASETS_PREFIXES:
94+
if self.location.startswith(db_datasets):
8195
return True
8296
return False
8397

84-
def _sql_migrate_external(self, target_table_key):
98+
def sql_migrate_external(self, target_table_key):
8599
return f"SYNC TABLE {target_table_key} FROM {self.key};"
86100

87-
def _sql_migrate_managed(self, target_table_key):
101+
def sql_migrate_dbfs(self, target_table_key):
88102
if not self.is_delta:
89103
msg = f"{self.key} is not DELTA: {self.table_format}"
90104
raise ValueError(msg)
91105
return f"CREATE TABLE IF NOT EXISTS {target_table_key} DEEP CLONE {self.key};"
92106

93-
def _sql_migrate_view(self, target_table_key):
107+
def sql_migrate_view(self, target_table_key):
94108
return f"CREATE VIEW IF NOT EXISTS {target_table_key} AS {self.view_text};"
95109

96110

tests/integration/hive_metastore/test_migrate.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ def test_mapping_skips_tables_databases(ws, sql_backend, inventory_schema, make_
198198
src_schema1 = make_schema(catalog_name="hive_metastore")
199199
src_schema2 = make_schema(catalog_name="hive_metastore")
200200
table_to_migrate = make_table(schema_name=src_schema1.name)
201+
table_databricks_dataset = make_table(
202+
schema_name=src_schema1.name, external_csv="dbfs:/databricks-datasets/adult/adult.data"
203+
)
201204
table_to_skip = make_table(schema_name=src_schema1.name)
202205
table_in_skipped_database = make_table(schema_name=src_schema2.name)
203206
all_tables = [table_to_migrate, table_to_skip, table_in_skipped_database]
@@ -224,6 +227,14 @@ def test_mapping_skips_tables_databases(ws, sql_backend, inventory_schema, make_
224227
table_to_skip.name,
225228
table_to_skip.name,
226229
),
230+
Rule(
231+
"workspace",
232+
dst_catalog.name,
233+
src_schema1.name,
234+
dst_schema1.name,
235+
table_databricks_dataset.name,
236+
table_databricks_dataset.name,
237+
),
227238
Rule(
228239
"workspace",
229240
dst_catalog.name,

tests/unit/hive_metastore/test_table_migrate.py

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
logger = logging.getLogger(__name__)
2424

2525

26-
def test_migrate_managed_tables_should_produce_proper_queries():
26+
def test_migrate_dbfs_root_tables_should_produce_proper_queries():
2727
errors = {}
2828
rows = {}
2929
backend = MockBackend(fails_on_first=errors, rows=rows)
@@ -32,20 +32,38 @@ def test_migrate_managed_tables_should_produce_proper_queries():
3232
table_mapping = create_autospec(TableMapping)
3333
table_mapping.get_tables_to_migrate.return_value = [
3434
TableToMigrate(
35-
Table("hive_metastore", "db1_src", "managed_src", "MANAGED", "DELTA"),
36-
Rule("workspace", "ucx_default", "db1_src", "db1_dst", "managed_src", "managed_dst"),
37-
)
35+
Table("hive_metastore", "db1_src", "managed_dbfs", "MANAGED", "DELTA", "dbfs:/some_location"),
36+
Rule("workspace", "ucx_default", "db1_src", "db1_dst", "managed_dbfs", "managed_dbfs"),
37+
),
38+
TableToMigrate(
39+
Table("hive_metastore", "db1_src", "managed_mnt", "MANAGED", "DELTA", "s3:/mnt/location"),
40+
Rule("workspace", "ucx_default", "db1_src", "db1_dst", "managed_mnt", "managed_mnt"),
41+
),
42+
TableToMigrate(
43+
Table("hive_metastore", "db1_src", "managed_other", "MANAGED", "DELTA", "s3:/location"),
44+
Rule("workspace", "ucx_default", "db1_src", "db1_dst", "managed_other", "managed_other"),
45+
),
3846
]
3947
table_migrate = TablesMigrate(table_crawler, client, backend, table_mapping)
4048
table_migrate.migrate_tables()
4149

42-
assert (list(backend.queries)) == [
43-
"CREATE TABLE IF NOT EXISTS ucx_default.db1_dst.managed_dst DEEP CLONE hive_metastore.db1_src.managed_src;",
44-
"ALTER TABLE hive_metastore.db1_src.managed_src "
45-
"SET TBLPROPERTIES ('upgraded_to' = 'ucx_default.db1_dst.managed_dst');",
46-
"ALTER TABLE ucx_default.db1_dst.managed_dst "
47-
"SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1_src.managed_src');",
48-
]
50+
assert (
51+
"CREATE TABLE IF NOT EXISTS ucx_default.db1_dst.managed_dbfs DEEP CLONE hive_metastore.db1_src.managed_dbfs;"
52+
) in list(backend.queries)
53+
assert "SYNC TABLE ucx_default.db1_dst.managed_mnt FROM hive_metastore.db1_src.managed_mnt;" in list(
54+
backend.queries
55+
)
56+
assert (
57+
"ALTER TABLE hive_metastore.db1_src.managed_dbfs "
58+
"SET TBLPROPERTIES ('upgraded_to' = 'ucx_default.db1_dst.managed_dbfs');"
59+
) in list(backend.queries)
60+
assert (
61+
"ALTER TABLE ucx_default.db1_dst.managed_dbfs "
62+
"SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1_src.managed_dbfs');"
63+
) in list(backend.queries)
64+
assert "SYNC TABLE ucx_default.db1_dst.managed_other FROM hive_metastore.db1_src.managed_other;" in list(
65+
backend.queries
66+
)
4967

5068

5169
def test_migrate_external_tables_should_produce_proper_queries():
@@ -85,13 +103,15 @@ def test_migrate_view_should_produce_proper_queries():
85103
table_migrate = TablesMigrate(table_crawler, client, backend, table_mapping)
86104
table_migrate.migrate_tables()
87105

88-
assert (list(backend.queries)) == [
89-
"CREATE VIEW IF NOT EXISTS ucx_default.db1_dst.view_dst AS SELECT * FROM table;",
106+
assert "CREATE VIEW IF NOT EXISTS ucx_default.db1_dst.view_dst AS SELECT * FROM table;" in list(backend.queries)
107+
assert (
90108
"ALTER VIEW hive_metastore.db1_src.view_src "
91-
"SET TBLPROPERTIES ('upgraded_to' = 'ucx_default.db1_dst.view_dst');",
109+
"SET TBLPROPERTIES ('upgraded_to' = 'ucx_default.db1_dst.view_dst');"
110+
) in list(backend.queries)
111+
assert (
92112
"ALTER VIEW ucx_default.db1_dst.view_dst "
93-
"SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1_src.view_src');",
94-
]
113+
"SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1_src.view_src');"
114+
) in list(backend.queries)
95115

96116

97117
def get_table_migrate(backend: SqlBackend) -> TablesMigrate:

tests/unit/hive_metastore/test_tables.py

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,42 @@ def test_sql_managed_non_delta():
4141
with pytest.raises(ValueError):
4242
Table(
4343
catalog="catalog", database="db", name="table", object_type="type", table_format="PARQUET"
44-
)._sql_migrate_managed("catalog")
44+
).sql_migrate_dbfs("catalog")
4545

4646

4747
@pytest.mark.parametrize(
4848
"table,target,query",
4949
[
5050
(
51-
Table(catalog="catalog", database="db", name="managed_table", object_type="..", table_format="DELTA"),
51+
Table(
52+
catalog="catalog",
53+
database="db",
54+
name="managed_table",
55+
object_type="MANAGED",
56+
table_format="DELTA",
57+
location="dbfs:/location/table",
58+
),
5259
"new_catalog.db.managed_table",
5360
"CREATE TABLE IF NOT EXISTS new_catalog.db.managed_table DEEP CLONE catalog.db.managed_table;",
5461
),
62+
(
63+
Table(
64+
catalog="catalog",
65+
database="db",
66+
name="managed_table",
67+
object_type="MANAGED",
68+
table_format="DELTA",
69+
location="dbfs:/mnt/location/table",
70+
),
71+
"new_catalog.db.managed_table",
72+
"SYNC TABLE new_catalog.db.managed_table FROM catalog.db.managed_table;",
73+
),
5574
(
5675
Table(
5776
catalog="catalog",
5877
database="db",
5978
name="view",
60-
object_type="..",
79+
object_type="VIEW",
6180
table_format="DELTA",
6281
view_text="SELECT * FROM table",
6382
),
@@ -79,7 +98,12 @@ def test_sql_managed_non_delta():
7998
],
8099
)
81100
def test_uc_sql(table, target, query):
82-
assert table.uc_create_sql(target) == query
101+
if table.kind == "VIEW":
102+
assert table.sql_migrate_view(target) == query
103+
if table.kind == "TABLE" and table.is_dbfs_root:
104+
assert table.sql_migrate_dbfs(target) == query
105+
if table.kind == "TABLE" and not table.is_dbfs_root:
106+
assert table.sql_migrate_external(target) == query
83107

84108

85109
def test_tables_crawler_inventory_table():
@@ -113,15 +137,51 @@ def test_tables_returning_error_when_describing():
113137

114138

115139
def test_is_dbfs_root():
116-
assert Table("a", "b", "c", "MANAGED", "DELTA", location="dbfs:/somelocation/tablename").is_dbfs_root()
117-
assert Table("a", "b", "c", "MANAGED", "DELTA", location="/dbfs/somelocation/tablename").is_dbfs_root()
118-
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="dbfs:/mnt/somelocation/tablename").is_dbfs_root()
119-
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="/dbfs/mnt/somelocation/tablename").is_dbfs_root()
140+
assert Table("a", "b", "c", "MANAGED", "DELTA", location="dbfs:/somelocation/tablename").is_dbfs_root
141+
assert Table("a", "b", "c", "MANAGED", "DELTA", location="/dbfs/somelocation/tablename").is_dbfs_root
142+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="dbfs:/mnt/somelocation/tablename").is_dbfs_root
143+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="/dbfs/mnt/somelocation/tablename").is_dbfs_root
120144
assert not Table(
121145
"a", "b", "c", "MANAGED", "DELTA", location="dbfs:/databricks-datasets/somelocation/tablename"
122-
).is_dbfs_root()
146+
).is_dbfs_root
147+
assert not Table(
148+
"a", "b", "c", "MANAGED", "DELTA", location="/dbfs/databricks-datasets/somelocation/tablename"
149+
).is_dbfs_root
150+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="s3:/somelocation/tablename").is_dbfs_root
151+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="adls:/somelocation/tablename").is_dbfs_root
152+
153+
154+
def test_is_db_dataset():
155+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="dbfs:/somelocation/tablename").is_databricks_dataset
156+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="/dbfs/somelocation/tablename").is_databricks_dataset
157+
assert not Table(
158+
"a", "b", "c", "MANAGED", "DELTA", location="dbfs:/mnt/somelocation/tablename"
159+
).is_databricks_dataset
123160
assert not Table(
161+
"a", "b", "c", "MANAGED", "DELTA", location="/dbfs/mnt/somelocation/tablename"
162+
).is_databricks_dataset
163+
assert Table(
164+
"a", "b", "c", "MANAGED", "DELTA", location="dbfs:/databricks-datasets/somelocation/tablename"
165+
).is_databricks_dataset
166+
assert Table(
124167
"a", "b", "c", "MANAGED", "DELTA", location="/dbfs/databricks-datasets/somelocation/tablename"
125-
).is_dbfs_root()
126-
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="s3:/somelocation/tablename").is_dbfs_root()
127-
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="adls:/somelocation/tablename").is_dbfs_root()
168+
).is_databricks_dataset
169+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="s3:/somelocation/tablename").is_databricks_dataset
170+
assert not Table("a", "b", "c", "MANAGED", "DELTA", location="adls:/somelocation/tablename").is_databricks_dataset
171+
172+
173+
def test_is_supported_for_sync():
174+
assert Table(
175+
"a", "b", "c", "EXTERNAL", "DELTA", location="dbfs:/somelocation/tablename"
176+
).is_format_supported_for_sync
177+
assert Table("a", "b", "c", "EXTERNAL", "CSV", location="dbfs:/somelocation/tablename").is_format_supported_for_sync
178+
assert Table(
179+
"a", "b", "c", "EXTERNAL", "TEXT", location="dbfs:/somelocation/tablename"
180+
).is_format_supported_for_sync
181+
assert Table("a", "b", "c", "EXTERNAL", "ORC", location="dbfs:/somelocation/tablename").is_format_supported_for_sync
182+
assert Table(
183+
"a", "b", "c", "EXTERNAL", "JSON", location="dbfs:/somelocation/tablename"
184+
).is_format_supported_for_sync
185+
assert not (
186+
Table("a", "b", "c", "EXTERNAL", "AVRO", location="dbfs:/somelocation/tablename").is_format_supported_for_sync
187+
)

0 commit comments

Comments
 (0)