14
14
15
15
use std:: collections:: HashMap ;
16
16
17
+ use arrow_array:: Array ;
17
18
use arrow_array:: ArrayRef ;
18
19
use arrow_array:: RecordBatch ;
19
20
use arrow_array:: StructArray ;
@@ -94,6 +95,7 @@ impl BlockReader {
94
95
95
96
let mut blocks = Vec :: with_capacity ( record_batches. len ( ) ) ;
96
97
98
+ let mut offset = 0 ;
97
99
for record_batch in record_batches {
98
100
let num_rows_record_batch = record_batch. num_rows ( ) ;
99
101
let mut columns = Vec :: with_capacity ( self . projected_schema . fields . len ( ) ) ;
@@ -118,7 +120,7 @@ impl BlockReader {
118
120
// corresponding field, and a default value has been declared for
119
121
// the corresponding field.
120
122
//
121
- // Yes, it is too obscure , we need to polish it SOON.
123
+ // It is too confusing , we need to polish it SOON.
122
124
123
125
let value = match column_chunks. get ( & field. column_id ) {
124
126
Some ( DataItem :: RawData ( data) ) => {
@@ -140,19 +142,21 @@ impl BlockReader {
140
142
Value :: from_arrow_rs ( arrow_array, & data_type) ?
141
143
}
142
144
Some ( DataItem :: ColumnArray ( cached) ) => {
143
- // TODO this is NOT correct!
144
145
if column_node. is_nested {
145
146
// a defensive check, should never happen
146
147
return Err ( ErrorCode :: StorageOther (
147
148
"unexpected nested field: nested leaf field hits cached" ,
148
149
) ) ;
149
150
}
150
- Value :: from_arrow_rs ( cached. 0 . clone ( ) , & data_type) ?
151
+ let array = cached. 0 . slice ( offset, record_batch. num_rows ( ) ) ;
152
+ Value :: from_arrow_rs ( array, & data_type) ?
151
153
}
152
154
None => Value :: Scalar ( self . default_vals [ i] . clone ( ) ) ,
153
155
} ;
154
156
columns. push ( BlockEntry :: new ( data_type, value) ) ;
155
157
}
158
+
159
+ offset += record_batch. num_rows ( ) ;
156
160
blocks. push ( DataBlock :: new ( columns, num_rows_record_batch) ) ;
157
161
}
158
162
0 commit comments