File tree Expand file tree Collapse file tree 1 file changed +16
-1
lines changed
src/query/pipeline/sources/src/processors/sources/input_formats/impls Expand file tree Collapse file tree 1 file changed +16
-1
lines changed Original file line number Diff line number Diff line change @@ -280,6 +280,7 @@ impl RowGroupInMemory {
280
280
) ?;
281
281
column_chunks. push ( array_iters) ;
282
282
}
283
+
283
284
match RowGroupDeserializer :: new ( column_chunks, self . meta . num_rows ( ) , None ) . next ( ) {
284
285
None => Err ( ErrorCode :: Internal (
285
286
"deserialize from raw group: fail to get a chunk" ,
@@ -333,7 +334,21 @@ impl BlockBuilderTrait for ParquetBlockBuilder {
333
334
if let Some ( rg) = batch. as_mut ( ) {
334
335
let chunk = rg. get_arrow_chunk ( ) ?;
335
336
let block = DataBlock :: from_chunk ( & self . ctx . schema , & chunk) ?;
336
- Ok ( vec ! [ block] )
337
+
338
+ let block_total_rows = block. num_rows ( ) ;
339
+ let num_rows_per_block = self . ctx . block_compact_thresholds . max_rows_per_block ;
340
+ let blocks: Vec < DataBlock > = ( 0 ..block_total_rows)
341
+ . step_by ( num_rows_per_block)
342
+ . map ( |idx| {
343
+ if idx + num_rows_per_block < block_total_rows {
344
+ block. slice ( idx, num_rows_per_block)
345
+ } else {
346
+ block. slice ( idx, block_total_rows - idx)
347
+ }
348
+ } )
349
+ . collect ( ) ;
350
+
351
+ Ok ( blocks)
337
352
} else {
338
353
Ok ( vec ! [ ] )
339
354
}
You can’t perform that action at this time.
0 commit comments