@@ -20,9 +20,13 @@ package org.apache.spark.sql.catalyst.analysis
20
20
import org .apache .spark .sql .catalyst .expressions .{Cast , DefaultStringProducingExpression , Expression , Literal , SubqueryExpression }
21
21
import org .apache .spark .sql .catalyst .plans .logical .{AddColumns , AlterColumns , AlterColumnSpec , AlterViewAs , ColumnDefinition , CreateTable , CreateTempView , CreateView , LogicalPlan , QualifiedColType , ReplaceColumns , ReplaceTable , TableSpec , V2CreateTablePlan }
22
22
import org .apache .spark .sql .catalyst .rules .Rule
23
- import org .apache .spark .sql .connector .catalog .{SupportsNamespaces , TableCatalog }
23
+ import org .apache .spark .sql .catalyst .trees .CurrentOrigin
24
+ import org .apache .spark .sql .catalyst .util .CharVarcharUtils .CHAR_VARCHAR_TYPE_STRING_METADATA_KEY
25
+ import org .apache .spark .sql .connector .catalog .{CatalogV2Util , SupportsNamespaces , Table , TableCatalog }
24
26
import org .apache .spark .sql .connector .catalog .SupportsNamespaces .PROP_COLLATION
25
- import org .apache .spark .sql .types .{DataType , StringType }
27
+ import org .apache .spark .sql .errors .DataTypeErrors .toSQLId
28
+ import org .apache .spark .sql .errors .QueryCompilationErrors
29
+ import org .apache .spark .sql .types .{CharType , DataType , StringType , StructField , VarcharType }
26
30
27
31
/**
28
32
* Resolves string types in logical plans by assigning them the appropriate collation. The
@@ -33,12 +37,13 @@ import org.apache.spark.sql.types.{DataType, StringType}
33
37
*/
34
38
object ApplyDefaultCollationToStringType extends Rule [LogicalPlan ] {
35
39
def apply (plan : LogicalPlan ): LogicalPlan = {
36
- val planWithResolvedDefaultCollation = resolveDefaultCollation(plan)
40
+ val preprocessedPlan = Seq (resolveDefaultCollation _, resolveAlterColumnsDataType _)
41
+ .foldLeft(plan) { case (currentPlan, resolver) => resolver(currentPlan) }
37
42
38
- fetchDefaultCollation(planWithResolvedDefaultCollation ) match {
43
+ fetchDefaultCollation(preprocessedPlan ) match {
39
44
case Some (collation) =>
40
- transform(planWithResolvedDefaultCollation , StringType (collation))
41
- case None => planWithResolvedDefaultCollation
45
+ transform(preprocessedPlan , StringType (collation))
46
+ case None => preprocessedPlan
42
47
}
43
48
}
44
49
@@ -63,10 +68,14 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
63
68
case ReplaceTable (_ : ResolvedIdentifier , _, _, tableSpec : TableSpec , _) =>
64
69
tableSpec.collation
65
70
66
- // In `transform` we handle these 3 ALTER TABLE commands.
67
- case cmd : AddColumns => getCollationFromTableProps(cmd.table)
68
- case cmd : ReplaceColumns => getCollationFromTableProps(cmd.table)
69
- case cmd : AlterColumns => getCollationFromTableProps(cmd.table)
71
+ case AddColumns (resolvedTable : ResolvedTable , _) =>
72
+ Option (resolvedTable.table.properties.get(TableCatalog .PROP_COLLATION ))
73
+
74
+ case ReplaceColumns (resolvedTable : ResolvedTable , _) =>
75
+ Option (resolvedTable.table.properties.get(TableCatalog .PROP_COLLATION ))
76
+
77
+ case AlterColumns (resolvedTable : ResolvedTable , _) =>
78
+ Option (resolvedTable.table.properties.get(TableCatalog .PROP_COLLATION ))
70
79
71
80
case alterViewAs : AlterViewAs =>
72
81
alterViewAs.child match {
@@ -85,15 +94,6 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
85
94
}
86
95
}
87
96
88
- private def getCollationFromTableProps (t : LogicalPlan ): Option [String ] = {
89
- t match {
90
- case resolvedTbl : ResolvedTable
91
- if resolvedTbl.table.properties.containsKey(TableCatalog .PROP_COLLATION ) =>
92
- Some (resolvedTbl.table.properties.get(TableCatalog .PROP_COLLATION ))
93
- case _ => None
94
- }
95
- }
96
-
97
97
/**
98
98
* Determines the default collation for an object in the following order:
99
99
* 1. Use the object's explicitly defined default collation, if available.
@@ -151,22 +151,86 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
151
151
case p if isCreateOrAlterPlan(p) || AnalysisContext .get.collation.isDefined =>
152
152
transformPlan(p, newType)
153
153
154
- case addCols : AddColumns =>
154
+ case addCols@ AddColumns ( _ : ResolvedTable , _) =>
155
155
addCols.copy(columnsToAdd = replaceColumnTypes(addCols.columnsToAdd, newType))
156
156
157
- case replaceCols : ReplaceColumns =>
157
+ case replaceCols@ ReplaceColumns ( _ : ResolvedTable , _) =>
158
158
replaceCols.copy(columnsToAdd = replaceColumnTypes(replaceCols.columnsToAdd, newType))
159
159
160
- case a @ AlterColumns (_ , specs : Seq [AlterColumnSpec ]) =>
160
+ case a @ AlterColumns (ResolvedTable (_, _, table : Table , _) , specs : Seq [AlterColumnSpec ]) =>
161
161
val newSpecs = specs.map {
162
- case spec if spec.newDataType.isDefined && hasDefaultStringType (spec.newDataType.get ) =>
162
+ case spec if shouldApplyDefaultCollationToAlterColumn (spec, table ) =>
163
163
spec.copy(newDataType = Some (replaceDefaultStringType(spec.newDataType.get, newType)))
164
164
case col => col
165
165
}
166
166
a.copy(specs = newSpecs)
167
167
}
168
168
}
169
169
170
+ /**
171
+ * The column type should not be changed if the original column type is [[StringType ]] and the new
172
+ * type is the default [[StringType ]] (i.e., [[StringType ]] without an explicit collation).
173
+ *
174
+ * Query Example:
175
+ * {{{
176
+ * CREATE TABLE t (c1 STRING COLLATE UNICODE)
177
+ * ALTER TABLE t ALTER COLUMN c1 TYPE STRING -- c1 will remain STRING COLLATE UNICODE
178
+ * }}}
179
+ */
180
+ private def resolveAlterColumnsDataType (plan : LogicalPlan ): LogicalPlan = {
181
+ plan match {
182
+ case alterColumns@ AlterColumns (
183
+ ResolvedTable (_, _, table : Table , _), specs : Seq [AlterColumnSpec ]) =>
184
+ val resolvedSpecs = specs.map { spec =>
185
+ if (spec.newDataType.isDefined && isStringTypeColumn(spec.column, table) &&
186
+ isDefaultStringType(spec.newDataType.get)) {
187
+ spec.copy(newDataType = None )
188
+ } else {
189
+ spec
190
+ }
191
+ }
192
+ val newAlterColumns = CurrentOrigin .withOrigin(alterColumns.origin) {
193
+ alterColumns.copy(specs = resolvedSpecs)
194
+ }
195
+ newAlterColumns.copyTagsFrom(alterColumns)
196
+ newAlterColumns
197
+ case _ =>
198
+ plan
199
+ }
200
+ }
201
+
202
+ private def shouldApplyDefaultCollationToAlterColumn (
203
+ alterColumnSpec : AlterColumnSpec , table : Table ): Boolean = {
204
+ alterColumnSpec.newDataType.isDefined &&
205
+ // Applies the default collation only if the original column's type is not StringType.
206
+ ! isStringTypeColumn(alterColumnSpec.column, table) &&
207
+ hasDefaultStringType(alterColumnSpec.newDataType.get)
208
+ }
209
+
210
+ /**
211
+ * Checks whether the column's [[DataType ]] is [[StringType ]] in the given table. Throws an error
212
+ * if the column is not found.
213
+ */
214
+ private def isStringTypeColumn (fieldName : FieldName , table : Table ): Boolean = {
215
+ CatalogV2Util .v2ColumnsToStructType(table.columns())
216
+ .findNestedField(fieldName.name, includeCollections = true , resolver = conf.resolver)
217
+ .map {
218
+ case (_, StructField (_, _ : CharType , _, _)) =>
219
+ false
220
+ case (_, StructField (_, _ : VarcharType , _, _)) =>
221
+ false
222
+ case (_, StructField (_, _ : StringType , _, metadata))
223
+ if ! metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY ) =>
224
+ true
225
+ case (_, _) =>
226
+ false
227
+ }
228
+ .getOrElse {
229
+ throw QueryCompilationErrors .unresolvedColumnError(
230
+ toSQLId(fieldName.name), table.columns().map(_.name))
231
+ }
232
+ }
233
+
170
234
/**
171
235
* Transforms the given plan, by transforming all expressions in its operators to use the given
172
236
* new type instead of the default string type.
0 commit comments