Skip to content

Commit 3913ccc

Browse files
committed
remove independent function
1 parent e148808 commit 3913ccc

File tree

3 files changed

+14
-87
lines changed

3 files changed

+14
-87
lines changed

src/query/datablocks/src/data_block.rs

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -292,51 +292,6 @@ impl DataBlock {
292292
Ok(DataBlock::create(schema.clone(), columns))
293293
}
294294

295-
pub fn from_chunk_with_row_limit<A: AsRef<dyn Array>>(
296-
schema: &DataSchemaRef,
297-
chuck: &Chunk<A>,
298-
num_rows: usize,
299-
) -> Result<Vec<DataBlock>> {
300-
let columns: Vec<ColumnRef> = chuck
301-
.columns()
302-
.iter()
303-
.zip(schema.fields().iter())
304-
.map(|(col, f)| match f.is_nullable() {
305-
true => col.into_nullable_column(),
306-
false => col.into_column(),
307-
})
308-
.collect();
309-
310-
let block = DataBlock::create(schema.clone(), columns);
311-
312-
let total_rows_in_block: usize = block.num_rows();
313-
314-
if total_rows_in_block >= num_rows {
315-
let num_blocks = if total_rows_in_block % num_rows == 0 {
316-
total_rows_in_block / num_rows
317-
} else {
318-
total_rows_in_block / num_rows + 1
319-
};
320-
let mut blocks: Vec<DataBlock> = Vec::with_capacity(num_blocks);
321-
322-
let mut offset = 0;
323-
let mut remain_rows = total_rows_in_block;
324-
while remain_rows >= num_rows {
325-
let cut = block.slice(offset, num_rows);
326-
blocks.push(cut);
327-
offset += num_rows;
328-
remain_rows -= num_rows;
329-
}
330-
if remain_rows > 0 {
331-
blocks.push(block.slice(offset, remain_rows));
332-
}
333-
334-
Ok(blocks)
335-
} else {
336-
Ok(vec![block])
337-
}
338-
}
339-
340295
pub fn get_serializers(&self) -> Result<Vec<TypeSerializerImpl>> {
341296
let columns_size = self.num_columns();
342297

src/query/datablocks/tests/it/data_block.rs

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -71,39 +71,3 @@ fn test_data_block_convert() -> Result<()> {
7171

7272
Ok(())
7373
}
74-
75-
#[test]
76-
fn test_data_block_from_chunk_with_row_limit() -> Result<()> {
77-
let schema = DataSchemaRefExt::create(vec![
78-
DataField::new("a", i32::to_data_type()),
79-
DataField::new("b", i32::to_data_type()),
80-
]);
81-
82-
let block = DataBlock::create(schema.clone(), vec![
83-
Series::from_data(vec![1i32, 2, 3, 4, 5]),
84-
Series::from_data(vec![1i32, 2, 3, 4, 5]),
85-
]);
86-
87-
assert_eq!(&schema, block.schema());
88-
assert_eq!(5, block.num_rows());
89-
assert_eq!(2, block.num_columns());
90-
91-
let chunk: Chunk<ArrayRef> = block.try_into().unwrap();
92-
93-
// first and last test.
94-
assert_eq!(5, chunk.len());
95-
assert_eq!(2, chunk.columns().len());
96-
97-
let new_blocks: Vec<DataBlock> =
98-
DataBlock::from_chunk_with_row_limit(&schema, &chunk, 2).unwrap();
99-
100-
assert_eq!(3, new_blocks.len());
101-
// first and last block test.
102-
assert_eq!(2, new_blocks[0].num_rows());
103-
assert_eq!(1, new_blocks[2].num_rows());
104-
105-
assert_eq!(2, new_blocks[0].num_columns());
106-
assert_eq!(2, new_blocks[2].num_columns());
107-
108-
Ok(())
109-
}

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -333,12 +333,20 @@ impl BlockBuilderTrait for ParquetBlockBuilder {
333333
fn deserialize(&mut self, mut batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
334334
if let Some(rg) = batch.as_mut() {
335335
let chunk = rg.get_arrow_chunk()?;
336-
// Using `block_compact_thresholds` is enough here.
337-
let blocks = DataBlock::from_chunk_with_row_limit(
338-
&self.ctx.schema,
339-
&chunk,
340-
self.ctx.block_compact_thresholds.max_rows_per_block,
341-
)?;
336+
let block = DataBlock::from_chunk(&self.ctx.schema, &chunk)?;
337+
338+
let block_total_rows = block.num_rows();
339+
let num_rows_per_block = self.ctx.block_compact_thresholds.max_rows_per_block;
340+
let blocks: Vec<DataBlock> = (0..block_total_rows)
341+
.step_by(num_rows_per_block)
342+
.map(|idx| {
343+
if idx + num_rows_per_block < block_total_rows {
344+
block.slice(idx, num_rows_per_block)
345+
} else {
346+
block.slice(idx, block_total_rows - idx)
347+
}
348+
})
349+
.collect();
342350

343351
Ok(blocks)
344352
} else {

0 commit comments

Comments
 (0)