Skip to content

Commit e148808

Browse files
committed
add test for row limit
1 parent e244afc commit e148808

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

src/query/datablocks/tests/it/data_block.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,39 @@ fn test_data_block_convert() -> Result<()> {
7171

7272
Ok(())
7373
}
74+
75+
#[test]
76+
fn test_data_block_from_chunk_with_row_limit() -> Result<()> {
77+
let schema = DataSchemaRefExt::create(vec![
78+
DataField::new("a", i32::to_data_type()),
79+
DataField::new("b", i32::to_data_type()),
80+
]);
81+
82+
let block = DataBlock::create(schema.clone(), vec![
83+
Series::from_data(vec![1i32, 2, 3, 4, 5]),
84+
Series::from_data(vec![1i32, 2, 3, 4, 5]),
85+
]);
86+
87+
assert_eq!(&schema, block.schema());
88+
assert_eq!(5, block.num_rows());
89+
assert_eq!(2, block.num_columns());
90+
91+
let chunk: Chunk<ArrayRef> = block.try_into().unwrap();
92+
93+
// first and last test.
94+
assert_eq!(5, chunk.len());
95+
assert_eq!(2, chunk.columns().len());
96+
97+
let new_blocks: Vec<DataBlock> =
98+
DataBlock::from_chunk_with_row_limit(&schema, &chunk, 2).unwrap();
99+
100+
assert_eq!(3, new_blocks.len());
101+
// first and last block test.
102+
assert_eq!(2, new_blocks[0].num_rows());
103+
assert_eq!(1, new_blocks[2].num_rows());
104+
105+
assert_eq!(2, new_blocks[0].num_columns());
106+
assert_eq!(2, new_blocks[2].num_columns());
107+
108+
Ok(())
109+
}

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ use crate::processors::sources::input_formats::input_split::FileInfo;
6262
use crate::processors::sources::input_formats::input_split::SplitInfo;
6363
use crate::processors::sources::input_formats::InputFormat;
6464

65-
pub const DEFAULT_ROW_PER_BLOCK: usize = 1000 * 1000;
66-
6765
pub struct InputFormatParquet;
6866

6967
fn col_offset(meta: &ColumnChunkMetaData) -> i64 {
@@ -335,10 +333,11 @@ impl BlockBuilderTrait for ParquetBlockBuilder {
335333
fn deserialize(&mut self, mut batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
336334
if let Some(rg) = batch.as_mut() {
337335
let chunk = rg.get_arrow_chunk()?;
336+
// Using `block_compact_thresholds` is enough here.
338337
let blocks = DataBlock::from_chunk_with_row_limit(
339338
&self.ctx.schema,
340339
&chunk,
341-
DEFAULT_ROW_PER_BLOCK,
340+
self.ctx.block_compact_thresholds.max_rows_per_block,
342341
)?;
343342

344343
Ok(blocks)

0 commit comments

Comments
 (0)