Skip to content

Commit 93dd574

Browse files
committed
Add SchemaInferenceUtilsSuite
1 parent bbd48e4 commit 93dd574

File tree

1 file changed

+273
-0
lines changed

1 file changed

+273
-0
lines changed
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.pipelines.util
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.connector.catalog.TableChange
22+
import org.apache.spark.sql.types._
23+
24+
class SchemaInferenceUtilsSuite extends SparkFunSuite {
25+
26+
test("determineColumnChanges - adding new columns") {
27+
val currentSchema = new StructType()
28+
.add("id", IntegerType, nullable = false)
29+
.add("name", StringType)
30+
31+
val targetSchema = new StructType()
32+
.add("id", IntegerType, nullable = false)
33+
.add("name", StringType)
34+
.add("age", IntegerType)
35+
.add("email", StringType, nullable = true, "Email address")
36+
37+
val changes = SchemaInferenceUtils.diffSchemas(currentSchema, targetSchema)
38+
39+
// Should have 2 changes - adding 'age' and 'email' columns
40+
assert(changes.length === 2)
41+
42+
// Verify the changes are of the correct type and have the right properties
43+
val ageChange = changes
44+
.find {
45+
case addCol: TableChange.AddColumn => addCol.fieldNames().sameElements(Array("age"))
46+
case _ => false
47+
}
48+
.get
49+
.asInstanceOf[TableChange.AddColumn]
50+
51+
val emailChange = changes
52+
.find {
53+
case addCol: TableChange.AddColumn => addCol.fieldNames().sameElements(Array("email"))
54+
case _ => false
55+
}
56+
.get
57+
.asInstanceOf[TableChange.AddColumn]
58+
59+
// Verify age column properties
60+
assert(ageChange.dataType() === IntegerType)
61+
assert(ageChange.isNullable() === true) // Default nullable is true
62+
assert(ageChange.comment() === null)
63+
64+
// Verify email column properties
65+
assert(emailChange.dataType() === StringType)
66+
assert(emailChange.isNullable() === true)
67+
assert(emailChange.comment() === "Email address")
68+
}
69+
70+
test("determineColumnChanges - updating column types") {
71+
val currentSchema = new StructType()
72+
.add("id", IntegerType, nullable = false)
73+
.add("amount", DoubleType)
74+
.add("timestamp", TimestampType)
75+
76+
val targetSchema = new StructType()
77+
.add("id", LongType, nullable = false) // Changed type from Int to Long
78+
.add("amount", DecimalType(10, 2)) // Changed type from Double to Decimal
79+
.add("timestamp", TimestampType) // No change
80+
81+
val changes = SchemaInferenceUtils.diffSchemas(currentSchema, targetSchema)
82+
83+
// Should have 2 changes - updating 'id' and 'amount' column types
84+
assert(changes.length === 2)
85+
86+
// Verify the changes are of the correct type
87+
val idChange = changes
88+
.find {
89+
case update: TableChange.UpdateColumnType => update.fieldNames().sameElements(Array("id"))
90+
case _ => false
91+
}
92+
.get
93+
.asInstanceOf[TableChange.UpdateColumnType]
94+
95+
val amountChange = changes
96+
.find {
97+
case update: TableChange.UpdateColumnType =>
98+
update.fieldNames().sameElements(Array("amount"))
99+
case _ => false
100+
}
101+
.get
102+
.asInstanceOf[TableChange.UpdateColumnType]
103+
104+
// Verify the new data types
105+
assert(idChange.newDataType() === LongType)
106+
assert(amountChange.newDataType() === DecimalType(10, 2))
107+
}
108+
109+
test("determineColumnChanges - updating nullability and comments") {
110+
val currentSchema = new StructType()
111+
.add("id", IntegerType, nullable = false)
112+
.add("name", StringType, nullable = true)
113+
.add("description", StringType, nullable = true, "Item description")
114+
115+
val targetSchema = new StructType()
116+
.add("id", IntegerType, nullable = true) // Changed nullability
117+
.add("name", StringType, nullable = false) // Changed nullability
118+
.add("description", StringType, nullable = true, "Product description") // Changed comment
119+
120+
val changes = SchemaInferenceUtils.diffSchemas(currentSchema, targetSchema)
121+
122+
// Should have 3 changes - updating nullability for 'id' and 'name', and comment for
123+
// 'description'
124+
assert(changes.length === 3)
125+
126+
// Verify the nullability changes
127+
val idNullabilityChange = changes
128+
.find {
129+
case update: TableChange.UpdateColumnNullability =>
130+
update.fieldNames().sameElements(Array("id"))
131+
case _ => false
132+
}
133+
.get
134+
.asInstanceOf[TableChange.UpdateColumnNullability]
135+
136+
val nameNullabilityChange = changes
137+
.find {
138+
case update: TableChange.UpdateColumnNullability =>
139+
update.fieldNames().sameElements(Array("name"))
140+
case _ => false
141+
}
142+
.get
143+
.asInstanceOf[TableChange.UpdateColumnNullability]
144+
145+
// Verify the comment change
146+
val descriptionCommentChange = changes
147+
.find {
148+
case update: TableChange.UpdateColumnComment =>
149+
update.fieldNames().sameElements(Array("description"))
150+
case _ => false
151+
}
152+
.get
153+
.asInstanceOf[TableChange.UpdateColumnComment]
154+
155+
// Verify the new nullability values
156+
assert(idNullabilityChange.nullable() === true)
157+
assert(nameNullabilityChange.nullable() === false)
158+
159+
// Verify the new comment
160+
assert(descriptionCommentChange.newComment() === "Product description")
161+
}
162+
163+
test("determineColumnChanges - complex changes") {
164+
val currentSchema = new StructType()
165+
.add("id", IntegerType, nullable = false)
166+
.add("name", StringType)
167+
.add("old_field", BooleanType)
168+
169+
val targetSchema = new StructType()
170+
.add("id", LongType, nullable = true) // Changed type and nullability
171+
// Added comment and changed nullability
172+
.add("name", StringType, nullable = false, "Full name")
173+
.add("new_field", StringType) // New field
174+
175+
val changes = SchemaInferenceUtils.diffSchemas(currentSchema, targetSchema)
176+
177+
// Should have these changes:
178+
// 1. Update id type
179+
// 2. Update id nullability
180+
// 3. Update name nullability
181+
// 4. Update name comment
182+
// 5. Add new_field
183+
// 6. Remove old_field
184+
assert(changes.length === 6)
185+
186+
// Count the types of changes
187+
val typeChanges = changes.collect { case _: TableChange.UpdateColumnType => 1 }.size
188+
val nullabilityChanges = changes.collect {
189+
case _: TableChange.UpdateColumnNullability => 1
190+
}.size
191+
val commentChanges = changes.collect { case _: TableChange.UpdateColumnComment => 1 }.size
192+
val addColumnChanges = changes.collect { case _: TableChange.AddColumn => 1 }.size
193+
194+
assert(typeChanges === 1)
195+
assert(nullabilityChanges === 2)
196+
assert(commentChanges === 1)
197+
assert(addColumnChanges === 1)
198+
}
199+
200+
test("determineColumnChanges - no changes") {
201+
val schema = new StructType()
202+
.add("id", IntegerType, nullable = false)
203+
.add("name", StringType)
204+
.add("timestamp", TimestampType)
205+
206+
// Same schema, no changes expected
207+
val changes = SchemaInferenceUtils.diffSchemas(schema, schema)
208+
assert(changes.isEmpty)
209+
}
210+
211+
test("determineColumnChanges - deleting columns") {
212+
val currentSchema = new StructType()
213+
.add("id", IntegerType, nullable = false)
214+
.add("name", StringType)
215+
.add("age", IntegerType)
216+
.add("email", StringType)
217+
.add("phone", StringType)
218+
219+
val targetSchema = new StructType()
220+
.add("id", IntegerType, nullable = false)
221+
.add("name", StringType)
222+
// age, email, and phone columns are removed
223+
224+
val changes = SchemaInferenceUtils.diffSchemas(currentSchema, targetSchema)
225+
226+
// Should have 3 changes - deleting 'age', 'email', and 'phone' columns
227+
assert(changes.length === 3)
228+
229+
// Verify all changes are DeleteColumn operations
230+
val deleteChanges = changes.collect { case dc: TableChange.DeleteColumn => dc }
231+
assert(deleteChanges.length === 3)
232+
233+
// Verify the specific columns being deleted
234+
val columnNames = deleteChanges.map(_.fieldNames()(0)).toSet
235+
assert(columnNames === Set("age", "email", "phone"))
236+
}
237+
238+
test("determineColumnChanges - mixed additions and deletions") {
239+
val currentSchema = new StructType()
240+
.add("id", IntegerType, nullable = false)
241+
.add("first_name", StringType)
242+
.add("last_name", StringType)
243+
.add("age", IntegerType)
244+
245+
val targetSchema = new StructType()
246+
.add("id", IntegerType, nullable = false)
247+
.add("full_name", StringType) // New column
248+
.add("email", StringType) // New column
249+
.add("age", IntegerType) // Unchanged
250+
// first_name and last_name are removed
251+
252+
val changes = SchemaInferenceUtils.diffSchemas(currentSchema, targetSchema)
253+
254+
// Should have 4 changes:
255+
// - 2 additions (full_name, email)
256+
// - 2 deletions (first_name, last_name)
257+
assert(changes.length === 4)
258+
259+
// Count the types of changes
260+
val addChanges = changes.collect { case ac: TableChange.AddColumn => ac }
261+
val deleteChanges = changes.collect { case dc: TableChange.DeleteColumn => dc }
262+
263+
assert(addChanges.length === 2)
264+
assert(deleteChanges.length === 2)
265+
266+
// Verify the specific columns being added and deleted
267+
val addedColumnNames = addChanges.map(_.fieldNames()(0)).toSet
268+
val deletedColumnNames = deleteChanges.map(_.fieldNames()(0)).toSet
269+
270+
assert(addedColumnNames === Set("full_name", "email"))
271+
assert(deletedColumnNames === Set("first_name", "last_name"))
272+
}
273+
}

0 commit comments

Comments
 (0)