Skip to content

[SPARK-52638][SQL] Allow preserving Hive-style column order to be configurable #51342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5989,6 +5989,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val LEGACY_PRESERVE_HIVE_COLUMN_ORDER =
buildConf("spark.sql.legacy.preserveHiveColumnOrder")
.internal()
.doc("When true, tables created by HiveExternalCatalog will maintain Hive-style column " +
"order where the partition columns are at the end. Otherwise, use the user-specified " +
"column order. Does not affect tables with provider = `hive`")
.version("4.1.0")
.booleanConf
.createWithDefault(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default behavior should be respect the user-specified column order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel it is behavior change and too risky?


/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.SQLConf.LEGACY_PRESERVE_HIVE_COLUMN_ORDER
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
Expand Down Expand Up @@ -259,6 +260,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
}

if (!conf.get(LEGACY_PRESERVE_HIVE_COLUMN_ORDER)) {
tableProperties.put(LEGACY_COLUMN_ORDER, "false")
}

// we have to set the table schema here so that the table schema JSON
// string in the table properties still uses the original schema
val hiveTable = tableDefinition.copy(
Expand Down Expand Up @@ -893,12 +898,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val schemaFromTableProps =
getSchemaFromTableProperties(table.properties).getOrElse(new StructType())
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
val schema = if (isLegacyColumnOrder(table)) {
reorderSchema(schema = schemaFromTableProps, partColumnNames)
} else {
schemaFromTableProps
}

table.copy(
provider = Some(provider),
storage = storageWithoutHiveGeneratedProperties,
schema = reorderedSchema,
schema = schema,
partitionColumnNames = partColumnNames,
bucketSpec = getBucketSpecFromTableProperties(table),
tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG),
Expand Down Expand Up @@ -1430,6 +1439,8 @@ object HiveExternalCatalog {
val EMPTY_DATA_SCHEMA = new StructType()
.add("col", "array<string>", nullable = true, comment = "from deserializer")

val LEGACY_COLUMN_ORDER = "legacy_column_order"

private def getColumnNamesByType(
props: Map[String, String],
colType: String,
Expand Down Expand Up @@ -1489,4 +1500,14 @@ object HiveExternalCatalog {
case st: StringType => st.isUTF8BinaryCollation
case _ => true
}

// Whether table was created specifying legacy column order
// lack of the table property means we preserve legacy column order
private def isLegacyColumnOrder(table: CatalogTable): Boolean = {
val legacyColumnOrder = table.properties.get(LEGACY_COLUMN_ORDER)
legacyColumnOrder match {
case Some(l) if l.equalsIgnoreCase("false") => false
case _ => true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableCatalog, TableChange, TableInfo}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.connector.expressions.Expressions
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
Expand Down Expand Up @@ -3432,4 +3433,69 @@ class HiveDDLSuite
any[String], any[String], any[StructType])
}
}

test("SPARK-52638: Allow preserving Hive-style column order to be configurable") {
val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
Seq(true, false).map { preserveOrder => {
withSQLConf(SQLConf.LEGACY_PRESERVE_HIVE_COLUMN_ORDER.key -> preserveOrder.toString) {
withTable("t1") {
val identifier = Identifier.of(Array("default"), "t1")
val outputSchema = new StructType()
.add("a", IntegerType, true, "comment1")
.add("b", IntegerType, true, "comment2")
.add("c", IntegerType, true, "comment3")
.add("d", IntegerType, true, "comment4")
catalog.createTable(
identifier,
new TableInfo.Builder()
.withProperties(Map.empty.asJava)
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
.withPartitions(Array(Expressions.identity("a")))
.build()
)
val table1 = catalog.loadTable(identifier)
val cols = table1.columns()

if (preserveOrder) {
assert(cols.length == 4)
assert(cols(0).name() == "a")
assert(cols(1).name() == "b")
assert(cols(2).name() == "c")
assert(cols(3).name() == "d")
assert(table1.properties().get("spark.sql.legacy.preserveHiveColumnOrder") == "false")
} else {
assert(cols.length == 4)
assert(cols(0).name() == "b")
assert(cols(1).name() == "c")
assert(cols(2).name() == "d")
assert(cols(3).name() == "a")
}

catalog.alterTable(
identifier,
TableChange.addColumn(Array("e"), IntegerType)
)

val table2 = catalog.loadTable(identifier)
val cols2 = table2.columns()
if (preserveOrder) {
assert(cols2.length == 5)
assert(cols2(0).name() == "a")
assert(cols2(1).name() == "b")
assert(cols2(2).name() == "c")
assert(cols2(3).name() == "d")
assert(cols2(4).name() == "e")
assert(table2.properties().get("spark.sql.legacy.preserveHiveColumnOrder") == "false")
} else {
assert(cols2(0).name() == "b")
assert(cols2(1).name() == "c")
assert(cols2(2).name() == "d")
assert(cols2(3).name() == "e")
assert(cols2(4).name() == "a")
}
}
}
}
}
}
}