Skip to content

Commit dc6d9f4

Browse files
chenhao-dbyhuang-db
authored andcommitted
[SPARK-52267][SQL] Match field ID in ParquetToSparkSchemaConverter
### What changes were proposed in this pull request? In the vectorized Parquet reader, there are two classes to resolve the Parquet schema when reading a Parquet file: - `ParquetReadSupport`: it clips the Parquet schema to only include the necessary part used by the Spark requested schema. The matching considers both field name and ID. - `ParquetToSparkSchemaConverter`: it resolves the Parquet schema to a Spark type by connecting it to the Spark requested schema. The matching only considers field name. When the field ID matches but field name doesn't, the first step will clip the Parquet schema to the same structure as the Spark requested schema as expected. In the second step, the Parquet type cannot be connected to a Spark type in the requested schema, and it will be inferred as a Spark type. It will usually work as expected if the inferred type is the same as the requested type. But it is possible that they are different and the read is still valid. For example, if the Parquet type is `int` and the Spark type is `long`. In this case, the vectorized Parquet reader will produce `int` data in column vectors, which will be interpreted as `long` data by subsequent operations. This can happen in real user cases if an Iceberg table with both rename and change column type (int -> long) operations is converted into a Delta table. This situation may be very rare, though. This PR fixes by bug by matching field ID in `ParquetToSparkSchemaConverter` when the name cannot be matched. I know that `ParquetReadSupport` gives priority to field ID when it exists, but I am not fully confident about this change and would like to keep the semantic change minimal. ### Why are the changes needed? It fixes a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, as stated above. ### How was this patch tested? Unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50990 from chenhao-db/ParquetToSparkSchemaConverter_fieldId. Authored-by: Chenhao Li <chenhao.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent a3cce98 commit dc6d9f4

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,25 @@ class ParquetToSparkSchemaConverter(
5757
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
5858
caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
5959
inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get,
60-
nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {
60+
nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get,
61+
useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get) {
6162

6263
def this(conf: SQLConf) = this(
6364
assumeBinaryIsString = conf.isParquetBinaryAsString,
6465
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
6566
caseSensitive = conf.caseSensitiveAnalysis,
6667
inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled,
67-
nanosAsLong = conf.legacyParquetNanosAsLong)
68+
nanosAsLong = conf.legacyParquetNanosAsLong,
69+
useFieldId = conf.parquetFieldIdReadEnabled)
6870

6971
def this(conf: Configuration) = this(
7072
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
7173
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
7274
caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
7375
inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
74-
nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)
76+
nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean,
77+
useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key,
78+
SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get))
7579

7680
/**
7781
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
@@ -107,11 +111,29 @@ class ParquetToSparkSchemaConverter(
107111
val schemaMapOpt = sparkReadSchema.map { schema =>
108112
schema.map(f => normalizeFieldName(f.name) -> f).toMap
109113
}
114+
// Use ID mapping only when the name mapping doesn't find a match.
115+
lazy val schemaIdMapOpt = sparkReadSchema match {
116+
case Some(schema) if useFieldId =>
117+
Some(schema.fields.flatMap { f =>
118+
if (ParquetUtils.hasFieldId(f)) {
119+
Some((ParquetUtils.getFieldId(f), f))
120+
} else {
121+
None
122+
}
123+
}.toMap)
124+
case _ => None
125+
}
110126

111127
val converted = (0 until groupColumn.getChildrenCount).map { i =>
112128
val field = groupColumn.getChild(i)
113129
val fieldFromReadSchema = schemaMapOpt.flatMap { schemaMap =>
114130
schemaMap.get(normalizeFieldName(field.getName))
131+
}.orElse {
132+
val parquetFieldId = Option(field.getType.getId).map(_.intValue())
133+
(parquetFieldId, schemaIdMapOpt) match {
134+
case (Some(id), Some(map)) => map.get(id)
135+
case _ => None
136+
}
115137
}
116138
var fieldReadType = fieldFromReadSchema.map(_.dataType)
117139

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.SparkException
2323
import org.apache.spark.sql.{QueryTest, Row}
2424
import org.apache.spark.sql.internal.SQLConf
2525
import org.apache.spark.sql.test.SharedSparkSession
26-
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, MetadataBuilder, StringType, StructType}
26+
import org.apache.spark.sql.types._
2727

2828
class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession {
2929

@@ -239,4 +239,31 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS
239239
}
240240
}
241241
}
242+
243+
test("SPARK-52267: Field ID mapping when field name doesn't match") {
244+
withTempDir { dir =>
245+
val readSchema = new StructType().add("id1", LongType, true, withId(1))
246+
val writeSchema = new StructType().add("id2", IntegerType, true, withId(1))
247+
248+
withSQLConf(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key -> "true") {
249+
val writeData = Seq(Row(1), Row(2), Row(3))
250+
spark.createDataFrame(writeData.asJava, writeSchema)
251+
.write.mode("overwrite").parquet(dir.getCanonicalPath)
252+
}
253+
254+
withAllParquetReaders {
255+
withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "false") {
256+
checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath),
257+
Seq(Row(null), Row(null), Row(null)))
258+
}
259+
// Without the fix, the result is unpredictable when PARQUET_FIELD_ID_READ_ENABLED is
260+
// enabled. It could cause NPE if OnHeapColumnVector is used in the scan. It could produce
261+
// incorrect results if OffHeapColumnVector is used.
262+
withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") {
263+
checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath),
264+
Seq(Row(1L), Row(2L), Row(3L)))
265+
}
266+
}
267+
}
268+
}
242269
}

0 commit comments

Comments
 (0)