From 301e74fb98e218e2da3ea6536baf0ab1766a325d Mon Sep 17 00:00:00 2001 From: Ahmed Mezghani Date: Tue, 28 Oct 2025 21:40:33 +0100 Subject: [PATCH 1/3] feat: support conversion of chunked arrays --- parquet/pqarrow/column_readers.go | 21 ++-- parquet/pqarrow/column_readers_test.go | 150 +++++++++++++++++++++++++ parquet/pqarrow/file_reader.go | 4 +- 3 files changed, 167 insertions(+), 8 deletions(-) create mode 100644 parquet/pqarrow/column_readers_test.go diff --git a/parquet/pqarrow/column_readers.go b/parquet/pqarrow/column_readers.go index 3ea81a71..1be8a48e 100644 --- a/parquet/pqarrow/column_readers.go +++ b/parquet/pqarrow/column_readers.go @@ -319,7 +319,7 @@ func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) { return nil, err } - childArrData[i], err = chunksToSingle(field) + childArrData[i], err = chunksToSingle(field, sr.rctx.mem) field.Release() // release field before checking if err != nil { return nil, err @@ -442,7 +442,7 @@ func (lr *listReader) BuildArray(lenBound int64) (*arrow.Chunked, error) { validityBuffer.Resize(int(bitutil.BytesForBits(validityIO.Read))) } - item, err := chunksToSingle(arr) + item, err := chunksToSingle(arr, lr.rctx.mem) if err != nil { return nil, err } @@ -489,9 +489,7 @@ func newFixedSizeListReader(rctx *readerCtx, field *arrow.Field, info file.Level } // helper function to combine chunks into a single array. -// -// nested data conversion for chunked array outputs not yet implemented -func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) { +func chunksToSingle(chunked *arrow.Chunked, mem memory.Allocator) (arrow.ArrayData, error) { switch len(chunked.Chunks()) { case 0: return array.NewData(chunked.DataType(), 0, []*memory.Buffer{nil, nil}, nil, 0, 0), nil @@ -499,8 +497,17 @@ func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) { data := chunked.Chunk(0).Data() data.Retain() // we pass control to the caller return data, nil - default: // if an item reader yields a chunked array, this is not yet implemented - return nil, arrow.ErrNotImplemented + default: + // concatenate multiple chunks into a single array + concatenated, err := array.Concatenate(chunked.Chunks(), mem) + if err != nil { + return nil, err + } + defer concatenated.Release() + + data := concatenated.Data() + data.Retain() + return data, nil } } diff --git a/parquet/pqarrow/column_readers_test.go b/parquet/pqarrow/column_readers_test.go new file mode 100644 index 00000000..77a5253f --- /dev/null +++ b/parquet/pqarrow/column_readers_test.go @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pqarrow + +import ( + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestChunksToSingle(t *testing.T) { + mem := memory.NewGoAllocator() + + t.Run("empty chunked array", func(t *testing.T) { + chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{}) + defer chunked.Release() + + result, err := chunksToSingle(chunked, mem) + require.NoError(t, err) + defer result.Release() + + assert.Equal(t, 0, result.Len()) + }) + + t.Run("single chunk", func(t *testing.T) { + bldr := array.NewInt32Builder(mem) + defer bldr.Release() + bldr.AppendValues([]int32{1, 2, 3, 4, 5}, nil) + arr := bldr.NewInt32Array() + defer arr.Release() + + chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{arr}) + defer chunked.Release() + + result, err := chunksToSingle(chunked, mem) + require.NoError(t, err) + defer result.Release() + + assert.Equal(t, 5, result.Len()) + }) + + t.Run("multiple chunks", func(t *testing.T) { + bldr := array.NewInt32Builder(mem) + defer bldr.Release() + + bldr.AppendValues([]int32{1, 2, 3}, nil) + chunk1 := bldr.NewInt32Array() + defer chunk1.Release() + + bldr.AppendValues([]int32{4, 5, 6}, nil) + chunk2 := bldr.NewInt32Array() + defer chunk2.Release() + + bldr.AppendValues([]int32{7, 8, 9, 10}, nil) + chunk3 := bldr.NewInt32Array() + defer chunk3.Release() + + chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{chunk1, chunk2, chunk3}) + defer chunked.Release() + + result, err := chunksToSingle(chunked, mem) + require.NoError(t, err) + defer result.Release() + + assert.Equal(t, 10, result.Len()) + + // Verify concatenated values + resultArr := array.MakeFromData(result).(*array.Int32) + defer resultArr.Release() + for i, expected := range []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { + assert.Equal(t, expected, resultArr.Value(i)) + } + }) + + t.Run("multiple chunks with nulls", func(t *testing.T) { + bldr := array.NewInt32Builder(mem) + defer bldr.Release() + + bldr.AppendValues([]int32{1, 0, 3}, []bool{true, false, true}) + chunk1 := bldr.NewInt32Array() + defer chunk1.Release() + + bldr.AppendValues([]int32{4, 0, 6}, []bool{true, false, true}) + chunk2 := bldr.NewInt32Array() + defer chunk2.Release() + + chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{chunk1, chunk2}) + defer chunked.Release() + + result, err := chunksToSingle(chunked, mem) + require.NoError(t, err) + defer result.Release() + + assert.Equal(t, 6, result.Len()) + assert.Equal(t, 2, result.NullN()) + + resultArr := array.MakeFromData(result).(*array.Int32) + defer resultArr.Release() + assert.False(t, resultArr.IsValid(1)) + assert.False(t, resultArr.IsValid(4)) + assert.Equal(t, int32(1), resultArr.Value(0)) + assert.Equal(t, int32(3), resultArr.Value(2)) + }) + + t.Run("multiple chunks string type", func(t *testing.T) { + bldr := array.NewStringBuilder(mem) + defer bldr.Release() + + bldr.AppendValues([]string{"hello", "world"}, nil) + chunk1 := bldr.NewStringArray() + defer chunk1.Release() + + bldr.AppendValues([]string{"arrow", "parquet"}, nil) + chunk2 := bldr.NewStringArray() + defer chunk2.Release() + + chunked := arrow.NewChunked(arrow.BinaryTypes.String, []arrow.Array{chunk1, chunk2}) + defer chunked.Release() + + result, err := chunksToSingle(chunked, mem) + require.NoError(t, err) + defer result.Release() + + assert.Equal(t, 4, result.Len()) + + resultArr := array.MakeFromData(result).(*array.String) + defer resultArr.Release() + assert.Equal(t, "hello", resultArr.Value(0)) + assert.Equal(t, "parquet", resultArr.Value(3)) + }) +} diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go index c85b2fc9..f59a589e 100644 --- a/parquet/pqarrow/file_reader.go +++ b/parquet/pqarrow/file_reader.go @@ -527,6 +527,7 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups parallel: fr.Props.Parallel, sc: sc, fieldReaders: readers, + mem: fr.mem, } rr.refCount.Add(1) return rr, nil @@ -721,6 +722,7 @@ type recordReader struct { fieldReaders []*ColumnReader cur arrow.RecordBatch err error + mem memory.Allocator refCount atomic.Int64 } @@ -789,7 +791,7 @@ func (r *recordReader) next() bool { return io.EOF } - arrdata, err := chunksToSingle(data) + arrdata, err := chunksToSingle(data, r.mem) if err != nil { return err } From dc63c0951fd7212d5138f8a13db252b15dc5d473 Mon Sep 17 00:00:00 2001 From: Ahmed Mezghani Date: Wed, 29 Oct 2025 05:56:25 +0100 Subject: [PATCH 2/3] use CheckedAllocator in test --- parquet/pqarrow/column_readers_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet/pqarrow/column_readers_test.go b/parquet/pqarrow/column_readers_test.go index 77a5253f..3789af51 100644 --- a/parquet/pqarrow/column_readers_test.go +++ b/parquet/pqarrow/column_readers_test.go @@ -28,7 +28,8 @@ import ( ) func TestChunksToSingle(t *testing.T) { - mem := memory.NewGoAllocator() + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) t.Run("empty chunked array", func(t *testing.T) { chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{}) From 43fd664ea76f69f0324c41f64fc41c59ef88ef5f Mon Sep 17 00:00:00 2001 From: Ahmed Mezghani Date: Wed, 29 Oct 2025 06:12:42 +0100 Subject: [PATCH 3/3] add TestChunkedTableRoundTrip --- parquet/pqarrow/column_readers_test.go | 124 +++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/parquet/pqarrow/column_readers_test.go b/parquet/pqarrow/column_readers_test.go index 3789af51..6a86b3f9 100644 --- a/parquet/pqarrow/column_readers_test.go +++ b/parquet/pqarrow/column_readers_test.go @@ -17,11 +17,17 @@ package pqarrow import ( + "bytes" + "context" + "io" "testing" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/compress" + "github.com/apache/arrow-go/v18/parquet/file" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -149,3 +155,121 @@ func TestChunksToSingle(t *testing.T) { assert.Equal(t, "parquet", resultArr.Value(3)) }) } + +func TestChunkedTableRoundTrip(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "int64_col", Type: arrow.PrimitiveTypes.Int64, Nullable: false}, + {Name: "string_col", Type: arrow.BinaryTypes.String, Nullable: true}, + }, + nil, + ) + + // Test data across 3 chunks: 5 + 3 + 2 = 10 rows + allInt64Values := []int64{10, 20, 30, 40, 50, 60, 70, 80, 90, 100} + allStringValues := []string{"hello", "world", "arrow", "parquet", "go", "chunked", "table", "test", "final", "chunk"} + + var buf bytes.Buffer + props := parquet.NewWriterProperties( + parquet.WithCompression(compress.Codecs.Snappy), + parquet.WithAllocator(mem), + ) + + writerProps := NewArrowWriterProperties( + WithAllocator(mem), + ) + + writer, err := NewFileWriter(schema, &buf, props, writerProps) + require.NoError(t, err) + + // Write three chunks: 5 rows, 3 rows, 2 rows + chunks := []struct{ start, end int }{ + {0, 5}, // First chunk: 5 rows + {5, 8}, // Second chunk: 3 rows + {8, 10}, // Third chunk: 2 rows + } + + for _, chunk := range chunks { + int64Builder := array.NewInt64Builder(mem) + int64Builder.AppendValues(allInt64Values[chunk.start:chunk.end], nil) + int64Arr := int64Builder.NewInt64Array() + int64Builder.Release() + + stringBuilder := array.NewStringBuilder(mem) + stringBuilder.AppendValues(allStringValues[chunk.start:chunk.end], nil) + stringArr := stringBuilder.NewStringArray() + stringBuilder.Release() + + rec := array.NewRecordBatch(schema, []arrow.Array{int64Arr, stringArr}, int64(chunk.end-chunk.start)) + + err = writer.Write(rec) + require.NoError(t, err) + + rec.Release() + int64Arr.Release() + stringArr.Release() + } + + err = writer.Close() + require.NoError(t, err) + + // Read back from parquet + pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), + file.WithReadProps(parquet.NewReaderProperties(mem))) + require.NoError(t, err) + defer pf.Close() + + reader, err := NewFileReader(pf, ArrowReadProperties{}, mem) + require.NoError(t, err) + + rr, err := reader.GetRecordReader(context.Background(), nil, nil) + require.NoError(t, err) + defer rr.Release() + + var records []arrow.RecordBatch + for { + rec, err := rr.Read() + if err == io.EOF { + break + } + require.NoError(t, err) + rec.Retain() + records = append(records, rec) + } + + readTable := array.NewTableFromRecords(schema, records) + defer readTable.Release() + + for _, rec := range records { + rec.Release() + } + + // Verify the read table + require.Equal(t, int64(10), readTable.NumRows()) + require.Equal(t, int64(2), readTable.NumCols()) + + // Verify int64 column values + int64Col := readTable.Column(0).Data() + int64Single, err := chunksToSingle(int64Col, mem) + require.NoError(t, err) + defer int64Single.Release() + int64Arr := array.MakeFromData(int64Single).(*array.Int64) + defer int64Arr.Release() + for i := 0; i < int64Arr.Len(); i++ { + assert.Equal(t, allInt64Values[i], int64Arr.Value(i)) + } + + // Verify string column values + stringCol := readTable.Column(1).Data() + stringSingle, err := chunksToSingle(stringCol, mem) + require.NoError(t, err) + defer stringSingle.Release() + stringArr := array.MakeFromData(stringSingle).(*array.String) + defer stringArr.Release() + for i := 0; i < stringArr.Len(); i++ { + assert.Equal(t, allStringValues[i], stringArr.Value(i)) + } +}