Skip to content

Commit 0757f2a

Browse files
djspiewakyhuang-db
authored andcommitted
[SPARK-52240] Corrected row index usage when exploding packed arrays in vectorized reader
This PR fixes an issue in the vectorized parquet reader with respect to executing the `explode` function on nested arrays where the array cuts across two or more pages. It's probably possible to minimize this slightly more but I wasn't able to find a reproducer. It's also worth noting that this issue illustrates a current gap in the lower-level unit tests for the vectorized reader, which don't appear to test much related to output vector offsets. The bug in question was a simple typo: the output row offset was used to dereference nested array lengths rather than input row offset. This only matters for the explode function and then only when resuming the same operation on a second page. This case (and all related cases) are, at present, untested. I added a high-level test and example `.parquet` file which reproduces the issue and verifies the fix, but it would be ideal if more tests were added at a lower level. It is very likely that other similar bugs are present within the vectorized reader as it relates to nested substructures remapped during the query pipeline. ### What changes were proposed in this pull request? It's a fairly straightforward typo issue in the code. ### Why are the changes needed? The vectorized parquet reader does not correctly handle this case ### Does this PR introduce _any_ user-facing change? Aside from fixing the vectorized reader? No. ### How was this patch tested? Unit test (well, more of an integration test) included in PR ### Was this patch authored or co-authored using generative AI tooling? Nope Closes apache#46928 from djspiewak/bug/packed-list-vectorized. Authored-by: Daniel Spiewak <dspiewak@nvidia.com> Signed-off-by: Chao Sun <chao@openai.com>
1 parent 561c21d commit 0757f2a

File tree

3 files changed

+11
-1
lines changed

3 files changed

+11
-1
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void readBinary(int total, WritableColumnVector c, int rowId) {
5656
ByteBufferOutputWriter outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
5757
int length;
5858
for (int i = 0; i < total; i++) {
59-
length = lengthsVector.getInt(rowId + i);
59+
length = lengthsVector.getInt(currentRow + i);
6060
try {
6161
buffer = in.slice(length);
6262
} catch (EOFException e) {
Binary file not shown.

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,6 +1307,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
13071307
}
13081308
}
13091309

1310+
test("explode nested lists crossing a rowgroup boundary") {
1311+
withAllParquetReaders {
1312+
checkAnswer(
1313+
readResourceParquetFile("test-data/packed-list-vectorized.parquet")
1314+
.selectExpr("explode(DIStatus.command_status.actions_status)")
1315+
.selectExpr("col.result"),
1316+
List.fill(4992)(Row("SUCCESS")))
1317+
}
1318+
}
1319+
13101320
test("read dictionary encoded decimals written as INT64") {
13111321
withAllParquetReaders {
13121322
checkAnswer(

0 commit comments

Comments
 (0)