Skip to content

Commit e244afc

Browse files
committed
split block by default rows
1 parent 4123682 commit e244afc

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

src/query/datablocks/src/data_block.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,51 @@ impl DataBlock {
292292
Ok(DataBlock::create(schema.clone(), columns))
293293
}
294294

295+
pub fn from_chunk_with_row_limit<A: AsRef<dyn Array>>(
296+
schema: &DataSchemaRef,
297+
chuck: &Chunk<A>,
298+
num_rows: usize,
299+
) -> Result<Vec<DataBlock>> {
300+
let columns: Vec<ColumnRef> = chuck
301+
.columns()
302+
.iter()
303+
.zip(schema.fields().iter())
304+
.map(|(col, f)| match f.is_nullable() {
305+
true => col.into_nullable_column(),
306+
false => col.into_column(),
307+
})
308+
.collect();
309+
310+
let block = DataBlock::create(schema.clone(), columns);
311+
312+
let total_rows_in_block: usize = block.num_rows();
313+
314+
if total_rows_in_block >= num_rows {
315+
let num_blocks = if total_rows_in_block % num_rows == 0 {
316+
total_rows_in_block / num_rows
317+
} else {
318+
total_rows_in_block / num_rows + 1
319+
};
320+
let mut blocks: Vec<DataBlock> = Vec::with_capacity(num_blocks);
321+
322+
let mut offset = 0;
323+
let mut remain_rows = total_rows_in_block;
324+
while remain_rows >= num_rows {
325+
let cut = block.slice(offset, num_rows);
326+
blocks.push(cut);
327+
offset += num_rows;
328+
remain_rows -= num_rows;
329+
}
330+
if remain_rows > 0 {
331+
blocks.push(block.slice(offset, remain_rows));
332+
}
333+
334+
Ok(blocks)
335+
} else {
336+
Ok(vec![block])
337+
}
338+
}
339+
295340
pub fn get_serializers(&self) -> Result<Vec<TypeSerializerImpl>> {
296341
let columns_size = self.num_columns();
297342

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ use crate::processors::sources::input_formats::input_split::FileInfo;
6262
use crate::processors::sources::input_formats::input_split::SplitInfo;
6363
use crate::processors::sources::input_formats::InputFormat;
6464

65+
pub const DEFAULT_ROW_PER_BLOCK: usize = 1000 * 1000;
66+
6567
pub struct InputFormatParquet;
6668

6769
fn col_offset(meta: &ColumnChunkMetaData) -> i64 {
@@ -280,6 +282,7 @@ impl RowGroupInMemory {
280282
)?;
281283
column_chunks.push(array_iters);
282284
}
285+
283286
match RowGroupDeserializer::new(column_chunks, self.meta.num_rows(), None).next() {
284287
None => Err(ErrorCode::Internal(
285288
"deserialize from raw group: fail to get a chunk",
@@ -332,8 +335,13 @@ impl BlockBuilderTrait for ParquetBlockBuilder {
332335
fn deserialize(&mut self, mut batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
333336
if let Some(rg) = batch.as_mut() {
334337
let chunk = rg.get_arrow_chunk()?;
335-
let block = DataBlock::from_chunk(&self.ctx.schema, &chunk)?;
336-
Ok(vec![block])
338+
let blocks = DataBlock::from_chunk_with_row_limit(
339+
&self.ctx.schema,
340+
&chunk,
341+
DEFAULT_ROW_PER_BLOCK,
342+
)?;
343+
344+
Ok(blocks)
337345
} else {
338346
Ok(vec![])
339347
}

0 commit comments

Comments
 (0)