1
+ import dataclasses
2
+ from typing import Literal
3
+
1
4
import pytest
2
- from databricks .sdk . errors import NotFound
5
+ from databricks .labs . lsql . core import Row
3
6
4
7
from databricks .labs .ucx .framework .utils import escape_sql_identifier
5
8
from databricks .labs .ucx .hive_metastore .tables import Table
6
9
7
10
8
11
@pytest .mark .parametrize (
9
- "prepare_tables_for_migration, workflow" ,
12
+ "scenario, workflow" ,
10
13
[
11
14
("regular" , "migrate-tables" ),
12
15
("hiveserde" , "migrate-external-hiveserde-tables-in-place-experimental" ),
13
16
("hiveserde" , "migrate-external-tables-ctas" ),
14
17
],
15
- indirect = ("prepare_tables_for_migration" ,),
16
18
)
17
19
def test_table_migration_job_refreshes_migration_status (
18
- ws ,
19
20
installation_ctx ,
20
- prepare_tables_for_migration ,
21
- workflow ,
22
- ):
21
+ scenario : Literal ["regular" , "hiveserde" ],
22
+ workflow : str ,
23
+ make_table_migration_context ,
24
+ ) -> None :
23
25
"""The migration status should be refreshed after the migration job."""
24
- tables , _ = prepare_tables_for_migration
26
+ tables , _ = make_table_migration_context ( scenario , installation_ctx )
25
27
ctx = installation_ctx .replace (
28
+ config_transform = lambda wc : dataclasses .replace (
29
+ wc ,
30
+ skip_tacl_migration = True ,
31
+ ),
26
32
extend_prompts = {
27
33
r".*Do you want to update the existing installation?.*" : 'yes' ,
28
34
},
29
35
)
30
36
31
37
ctx .workspace_installation .run ()
32
- ctx .deployed_workflows .run_workflow (workflow )
38
+ ctx .deployed_workflows .run_workflow (workflow , skip_job_wait = True )
39
+
40
+ assert ctx .deployed_workflows .validate_step (workflow )
33
41
34
42
# Avoiding MigrationStatusRefresh as it will refresh the status before fetching
35
43
migration_status_query = f"SELECT * FROM { ctx .config .inventory_database } .migration_status"
@@ -62,85 +70,76 @@ def test_table_migration_job_refreshes_migration_status(
62
70
assert len (asserts ) == 0 , assert_message
63
71
64
72
65
- @pytest .mark .parametrize (
66
- "prepare_tables_for_migration,workflow" ,
67
- [
68
- ("managed" , "migrate-tables" ),
69
- ],
70
- indirect = ("prepare_tables_for_migration" ,),
71
- )
72
- def test_table_migration_for_managed_table (ws , installation_ctx , prepare_tables_for_migration , workflow , sql_backend ):
73
- # This test cases test the CONVERT_TO_EXTERNAL scenario.
74
- tables , dst_schema = prepare_tables_for_migration
73
+ def test_table_migration_convert_manged_to_external (installation_ctx , make_table_migration_context ) -> None :
74
+ tables , dst_schema = make_table_migration_context ("managed" , installation_ctx )
75
75
ctx = installation_ctx .replace (
76
+ config_transform = lambda wc : dataclasses .replace (
77
+ wc ,
78
+ skip_tacl_migration = True ,
79
+ ),
76
80
extend_prompts = {
77
81
r"If hive_metastore contains managed table with external.*" : "0" ,
78
82
r".*Do you want to update the existing installation?.*" : 'yes' ,
79
83
},
80
84
)
81
85
82
86
ctx .workspace_installation .run ()
83
- ctx .deployed_workflows .run_workflow (workflow )
87
+ ctx .deployed_workflows .run_workflow ("migrate-tables" , skip_job_wait = True )
88
+
89
+ assert ctx .deployed_workflows .validate_step ("migrate-tables" )
84
90
91
+ missing_tables = set [str ]()
85
92
for table in tables .values ():
86
- try :
87
- assert ws .tables .get (f"{ dst_schema .catalog_name } .{ dst_schema .name } .{ table .name } " ).name
88
- except NotFound :
89
- assert False , f"{ table .name } not found in { dst_schema .catalog_name } .{ dst_schema .name } "
90
- managed_table = tables ["src_managed_table" ]
93
+ migrated_table_name = f"{ dst_schema .catalog_name } .{ dst_schema .name } .{ table .name } "
94
+ if not ctx .workspace_client .tables .exists (migrated_table_name ):
95
+ missing_tables .add (migrated_table_name )
96
+ assert not missing_tables , f"Missing migrated tables: { missing_tables } "
91
97
92
- for key , value , _ in sql_backend .fetch (f"DESCRIBE TABLE EXTENDED { escape_sql_identifier (managed_table .full_name )} " ):
98
+ managed_table = tables ["src_managed_table" ]
99
+ for key , value , _ in ctx .sql_backend .fetch (
100
+ f"DESCRIBE TABLE EXTENDED { escape_sql_identifier (managed_table .full_name )} "
101
+ ):
93
102
if key == "Type" :
94
103
assert value == "EXTERNAL"
95
104
break
96
105
97
106
98
- @pytest .mark .parametrize ('prepare_tables_for_migration' , [('hiveserde' )], indirect = True )
99
- def test_hiveserde_table_in_place_migration_job (ws , installation_ctx , prepare_tables_for_migration ):
100
- tables , dst_schema = prepare_tables_for_migration
107
+ @pytest .mark .parametrize (
108
+ "workflow" , ["migrate-external-hiveserde-tables-in-place-experimental" , "migrate-external-tables-ctas" ]
109
+ )
110
+ def test_hiveserde_table_in_place_migration_job (installation_ctx , make_table_migration_context , workflow ) -> None :
111
+ tables , dst_schema = make_table_migration_context ("hiveserde" , installation_ctx )
101
112
ctx = installation_ctx .replace (
113
+ config_transform = lambda wc : dataclasses .replace (
114
+ wc ,
115
+ skip_tacl_migration = True ,
116
+ ),
102
117
extend_prompts = {
103
118
r".*Do you want to update the existing installation?.*" : 'yes' ,
104
119
},
105
120
)
106
121
ctx .workspace_installation .run ()
107
- ctx .deployed_workflows .run_workflow ("migrate-external-hiveserde-tables-in-place-experimental" )
108
- # assert the workflow is successful
109
- assert ctx .deployed_workflows .validate_step ("migrate-external-hiveserde-tables-in-place-experimental" )
110
- # assert the tables are migrated
122
+
123
+ ctx .deployed_workflows .run_workflow (workflow , skip_job_wait = True )
124
+
125
+ assert installation_ctx .deployed_workflows .validate_step (workflow ), f"Workflow failed: { workflow } "
126
+ missing_tables = set [str ]()
111
127
for table in tables .values ():
112
- try :
113
- assert ws . tables .get ( f" { dst_schema . catalog_name } . { dst_schema . name } . { table . name } " ). name
114
- except NotFound :
115
- assert False , f"{ table . name } not found in { dst_schema . catalog_name } . { dst_schema . name } "
128
+ migrated_table_name = f" { dst_schema . catalog_name } . { dst_schema . name } . { table . name } "
129
+ if not ctx . workspace_client . tables .exists ( migrated_table_name ):
130
+ missing_tables . add ( migrated_table_name )
131
+ assert not missing_tables , f"Missing migrated tables: { missing_tables } "
116
132
117
133
118
- @pytest .mark .parametrize ('prepare_tables_for_migration' , [('hiveserde' )], indirect = True )
119
- def test_hiveserde_table_ctas_migration_job (ws , installation_ctx , prepare_tables_for_migration ):
120
- tables , dst_schema = prepare_tables_for_migration
134
+ def test_table_migration_job_publishes_remaining_tables (installation_ctx , make_table_migration_context ) -> None :
135
+ tables , dst_schema = make_table_migration_context ("regular" , installation_ctx )
121
136
ctx = installation_ctx .replace (
122
- extend_prompts = {
123
- r".*Do you want to update the existing installation?.*" : 'yes' ,
124
- },
137
+ config_transform = lambda wc : dataclasses .replace (
138
+ wc ,
139
+ skip_tacl_migration = True ,
140
+ ),
125
141
)
126
142
ctx .workspace_installation .run ()
127
- ctx .deployed_workflows .run_workflow ("migrate-external-tables-ctas" )
128
- # assert the workflow is successful
129
- assert ctx .deployed_workflows .validate_step ("migrate-external-tables-ctas" )
130
- # assert the tables are migrated
131
- for table in tables .values ():
132
- try :
133
- assert ws .tables .get (f"{ dst_schema .catalog_name } .{ dst_schema .name } .{ table .name } " ).name
134
- except NotFound :
135
- assert False , f"{ table .name } not found in { dst_schema .catalog_name } .{ dst_schema .name } "
136
-
137
-
138
- @pytest .mark .parametrize ('prepare_tables_for_migration' , ['regular' ], indirect = True )
139
- def test_table_migration_job_publishes_remaining_tables (
140
- ws , installation_ctx , sql_backend , prepare_tables_for_migration , caplog
141
- ):
142
- tables , dst_schema = prepare_tables_for_migration
143
- installation_ctx .workspace_installation .run ()
144
143
second_table = list (tables .values ())[1 ]
145
144
table = Table (
146
145
"hive_metastore" ,
@@ -149,19 +148,20 @@ def test_table_migration_job_publishes_remaining_tables(
149
148
object_type = "UNKNOWN" ,
150
149
table_format = "UNKNOWN" ,
151
150
)
152
- installation_ctx .table_mapping .skip_table_or_view (dst_schema .name , second_table .name , load_table = lambda * _ : table )
153
- installation_ctx . deployed_workflows . run_workflow ( "migrate-tables" )
154
- assert installation_ctx .deployed_workflows .validate_step ("migrate-tables" )
151
+ ctx .table_mapping .skip_table_or_view (dst_schema .name , second_table .name , load_table = lambda * _ : table )
152
+
153
+ ctx .deployed_workflows .run_workflow ("migrate-tables" , skip_job_wait = True )
155
154
155
+ assert ctx .deployed_workflows .validate_step ("migrate-tables" )
156
156
remaining_tables = list (
157
- sql_backend .fetch (
157
+ ctx . sql_backend .fetch (
158
158
f"""
159
159
SELECT
160
160
SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1)
161
161
AS message
162
- FROM { installation_ctx .inventory_database } .logs
162
+ FROM { ctx .inventory_database } .logs
163
163
WHERE message LIKE 'remained-hive-metastore-table: %'
164
164
"""
165
165
)
166
166
)
167
- assert remaining_tables [ 0 ]. message == f' hive_metastore.{ dst_schema .name } .{ second_table .name } '
167
+ assert remaining_tables == [ Row ( message = f" hive_metastore.{ dst_schema .name } .{ second_table .name } " )]
0 commit comments