Skip to content

Commit 301e74f

Browse files
committed
feat: support conversion of chunked arrays
1 parent 9b9fdd0 commit 301e74f

File tree

3 files changed

+167
-8
lines changed

3 files changed

+167
-8
lines changed

parquet/pqarrow/column_readers.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
319319
return nil, err
320320
}
321321

322-
childArrData[i], err = chunksToSingle(field)
322+
childArrData[i], err = chunksToSingle(field, sr.rctx.mem)
323323
field.Release() // release field before checking
324324
if err != nil {
325325
return nil, err
@@ -442,7 +442,7 @@ func (lr *listReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
442442
validityBuffer.Resize(int(bitutil.BytesForBits(validityIO.Read)))
443443
}
444444

445-
item, err := chunksToSingle(arr)
445+
item, err := chunksToSingle(arr, lr.rctx.mem)
446446
if err != nil {
447447
return nil, err
448448
}
@@ -489,18 +489,25 @@ func newFixedSizeListReader(rctx *readerCtx, field *arrow.Field, info file.Level
489489
}
490490

491491
// helper function to combine chunks into a single array.
492-
//
493-
// nested data conversion for chunked array outputs not yet implemented
494-
func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) {
492+
func chunksToSingle(chunked *arrow.Chunked, mem memory.Allocator) (arrow.ArrayData, error) {
495493
switch len(chunked.Chunks()) {
496494
case 0:
497495
return array.NewData(chunked.DataType(), 0, []*memory.Buffer{nil, nil}, nil, 0, 0), nil
498496
case 1:
499497
data := chunked.Chunk(0).Data()
500498
data.Retain() // we pass control to the caller
501499
return data, nil
502-
default: // if an item reader yields a chunked array, this is not yet implemented
503-
return nil, arrow.ErrNotImplemented
500+
default:
501+
// concatenate multiple chunks into a single array
502+
concatenated, err := array.Concatenate(chunked.Chunks(), mem)
503+
if err != nil {
504+
return nil, err
505+
}
506+
defer concatenated.Release()
507+
508+
data := concatenated.Data()
509+
data.Retain()
510+
return data, nil
504511
}
505512
}
506513

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package pqarrow
18+
19+
import (
20+
"testing"
21+
22+
"github.com/apache/arrow-go/v18/arrow"
23+
"github.com/apache/arrow-go/v18/arrow/array"
24+
"github.com/apache/arrow-go/v18/arrow/memory"
25+
26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
func TestChunksToSingle(t *testing.T) {
31+
mem := memory.NewGoAllocator()
32+
33+
t.Run("empty chunked array", func(t *testing.T) {
34+
chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{})
35+
defer chunked.Release()
36+
37+
result, err := chunksToSingle(chunked, mem)
38+
require.NoError(t, err)
39+
defer result.Release()
40+
41+
assert.Equal(t, 0, result.Len())
42+
})
43+
44+
t.Run("single chunk", func(t *testing.T) {
45+
bldr := array.NewInt32Builder(mem)
46+
defer bldr.Release()
47+
bldr.AppendValues([]int32{1, 2, 3, 4, 5}, nil)
48+
arr := bldr.NewInt32Array()
49+
defer arr.Release()
50+
51+
chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{arr})
52+
defer chunked.Release()
53+
54+
result, err := chunksToSingle(chunked, mem)
55+
require.NoError(t, err)
56+
defer result.Release()
57+
58+
assert.Equal(t, 5, result.Len())
59+
})
60+
61+
t.Run("multiple chunks", func(t *testing.T) {
62+
bldr := array.NewInt32Builder(mem)
63+
defer bldr.Release()
64+
65+
bldr.AppendValues([]int32{1, 2, 3}, nil)
66+
chunk1 := bldr.NewInt32Array()
67+
defer chunk1.Release()
68+
69+
bldr.AppendValues([]int32{4, 5, 6}, nil)
70+
chunk2 := bldr.NewInt32Array()
71+
defer chunk2.Release()
72+
73+
bldr.AppendValues([]int32{7, 8, 9, 10}, nil)
74+
chunk3 := bldr.NewInt32Array()
75+
defer chunk3.Release()
76+
77+
chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{chunk1, chunk2, chunk3})
78+
defer chunked.Release()
79+
80+
result, err := chunksToSingle(chunked, mem)
81+
require.NoError(t, err)
82+
defer result.Release()
83+
84+
assert.Equal(t, 10, result.Len())
85+
86+
// Verify concatenated values
87+
resultArr := array.MakeFromData(result).(*array.Int32)
88+
defer resultArr.Release()
89+
for i, expected := range []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
90+
assert.Equal(t, expected, resultArr.Value(i))
91+
}
92+
})
93+
94+
t.Run("multiple chunks with nulls", func(t *testing.T) {
95+
bldr := array.NewInt32Builder(mem)
96+
defer bldr.Release()
97+
98+
bldr.AppendValues([]int32{1, 0, 3}, []bool{true, false, true})
99+
chunk1 := bldr.NewInt32Array()
100+
defer chunk1.Release()
101+
102+
bldr.AppendValues([]int32{4, 0, 6}, []bool{true, false, true})
103+
chunk2 := bldr.NewInt32Array()
104+
defer chunk2.Release()
105+
106+
chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{chunk1, chunk2})
107+
defer chunked.Release()
108+
109+
result, err := chunksToSingle(chunked, mem)
110+
require.NoError(t, err)
111+
defer result.Release()
112+
113+
assert.Equal(t, 6, result.Len())
114+
assert.Equal(t, 2, result.NullN())
115+
116+
resultArr := array.MakeFromData(result).(*array.Int32)
117+
defer resultArr.Release()
118+
assert.False(t, resultArr.IsValid(1))
119+
assert.False(t, resultArr.IsValid(4))
120+
assert.Equal(t, int32(1), resultArr.Value(0))
121+
assert.Equal(t, int32(3), resultArr.Value(2))
122+
})
123+
124+
t.Run("multiple chunks string type", func(t *testing.T) {
125+
bldr := array.NewStringBuilder(mem)
126+
defer bldr.Release()
127+
128+
bldr.AppendValues([]string{"hello", "world"}, nil)
129+
chunk1 := bldr.NewStringArray()
130+
defer chunk1.Release()
131+
132+
bldr.AppendValues([]string{"arrow", "parquet"}, nil)
133+
chunk2 := bldr.NewStringArray()
134+
defer chunk2.Release()
135+
136+
chunked := arrow.NewChunked(arrow.BinaryTypes.String, []arrow.Array{chunk1, chunk2})
137+
defer chunked.Release()
138+
139+
result, err := chunksToSingle(chunked, mem)
140+
require.NoError(t, err)
141+
defer result.Release()
142+
143+
assert.Equal(t, 4, result.Len())
144+
145+
resultArr := array.MakeFromData(result).(*array.String)
146+
defer resultArr.Release()
147+
assert.Equal(t, "hello", resultArr.Value(0))
148+
assert.Equal(t, "parquet", resultArr.Value(3))
149+
})
150+
}

parquet/pqarrow/file_reader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups
527527
parallel: fr.Props.Parallel,
528528
sc: sc,
529529
fieldReaders: readers,
530+
mem: fr.mem,
530531
}
531532
rr.refCount.Add(1)
532533
return rr, nil
@@ -721,6 +722,7 @@ type recordReader struct {
721722
fieldReaders []*ColumnReader
722723
cur arrow.RecordBatch
723724
err error
725+
mem memory.Allocator
724726

725727
refCount atomic.Int64
726728
}
@@ -789,7 +791,7 @@ func (r *recordReader) next() bool {
789791
return io.EOF
790792
}
791793

792-
arrdata, err := chunksToSingle(data)
794+
arrdata, err := chunksToSingle(data, r.mem)
793795
if err != nil {
794796
return err
795797
}

0 commit comments

Comments
 (0)