Skip to content

Commit 197a129

Browse files
committed
[SPARK-52638][SQL] Allow preserving Hive-style column order to be configurable
1 parent 5e6e8f1 commit 197a129

File tree

3 files changed

+55
-9
lines changed

3 files changed

+55
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5989,6 +5989,16 @@ object SQLConf {
59895989
.booleanConf
59905990
.createWithDefault(true)
59915991

5992+
val PRESERVE_HIVE_COLUMN_ORDER =
5993+
buildConf("spark.sql.hive.preserveColumnOrder.enabled")
5994+
.internal()
5995+
.doc("When true, tables returned from HiveExternalCatalog preserve Hive-style column order " +
5996+
"where the partition columns are at the end. Otherwise, the user-specified column order " +
5997+
"is returned.")
5998+
.version("4.1.0")
5999+
.booleanConf
6000+
.createWithDefault(true)
6001+
59926002
/**
59936003
* Holds information about keys that have been deprecated.
59946004
*

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
4848
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
4949
import org.apache.spark.sql.hive.client.HiveClient
5050
import org.apache.spark.sql.internal.HiveSerDe
51+
import org.apache.spark.sql.internal.SQLConf.PRESERVE_HIVE_COLUMN_ORDER
5152
import org.apache.spark.sql.internal.StaticSQLConf._
5253
import org.apache.spark.sql.types._
5354
import org.apache.spark.sql.util.SchemaUtils
@@ -818,16 +819,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
818819
// columns are not put at the end of schema. We need to reorder it when reading the schema
819820
// from the table properties.
820821
private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = {
821-
val partitionFields = partColumnNames.map { partCol =>
822-
schema.find(_.name == partCol).getOrElse {
823-
throw new AnalysisException(
824-
errorClass = "_LEGACY_ERROR_TEMP_3088",
825-
messageParameters = Map(
826-
"schema" -> schema.catalogString,
827-
"partColumnNames" -> partColumnNames.mkString("[", ", ", "]")))
822+
if (conf.get(PRESERVE_HIVE_COLUMN_ORDER)) {
823+
schema
824+
} else {
825+
val partitionFields = partColumnNames.map { partCol =>
826+
schema.find(_.name == partCol).getOrElse {
827+
throw new AnalysisException(
828+
errorClass = "_LEGACY_ERROR_TEMP_3088",
829+
messageParameters = Map(
830+
"schema" -> schema.catalogString,
831+
"partColumnNames" -> partColumnNames.mkString("[", ", ", "]")))
832+
}
828833
}
834+
StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
829835
}
830-
StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
831836
}
832837

833838
private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
3636
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
3737
import org.apache.spark.sql.catalyst.catalog._
3838
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
39-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo}
39+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableCatalog, TableChange, TableInfo}
4040
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
4141
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
42+
import org.apache.spark.sql.connector.expressions.Expressions
4243
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
4344
import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
4445
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
@@ -3432,4 +3433,34 @@ class HiveDDLSuite
34323433
any[String], any[String], any[StructType])
34333434
}
34343435
}
3436+
3437+
test("SPARK-52638: Allow preserving Hive-style column order to be configurable") {
3438+
val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
3439+
withSQLConf(
3440+
SQLConf.PRESERVE_HIVE_COLUMN_ORDER.key -> "false"
3441+
) {
3442+
withTable("t1") {
3443+
val identifier = Identifier.of(Array("default"), "t1")
3444+
val outputSchema = new StructType()
3445+
.add("a", IntegerType, true, "comment1")
3446+
.add("b", IntegerType, true, "comment2")
3447+
.add("c", IntegerType, true, "comment3")
3448+
.add("d", IntegerType, true, "comment4")
3449+
catalog.createTable(
3450+
identifier,
3451+
new TableInfo.Builder()
3452+
.withProperties(Map.empty.asJava)
3453+
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
3454+
.withPartitions(Array(Expressions.identity("a")))
3455+
.build()
3456+
)
3457+
val cols = catalog.loadTable(identifier).columns()
3458+
assert(cols.length == 4)
3459+
assert(cols(0).name() == "a")
3460+
assert(cols(1).name() == "b")
3461+
assert(cols(2).name() == "c")
3462+
assert(cols(3).name() == "d")
3463+
}
3464+
}
3465+
}
34353466
}

0 commit comments

Comments
 (0)