Skip to content

Commit 8d2afed

Browse files
committed
read batch size for native format deser
1 parent 8189ebf commit 8d2afed

File tree

3 files changed

+11
-16
lines changed

3 files changed

+11
-16
lines changed

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl AggIndexReader {
139139
}
140140
}
141141

142-
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<DataBlock> {
142+
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<Vec<DataBlock>> {
143143
let mut all_columns_arrays = vec![];
144144

145145
for (index, column_node) in self.reader.project_column_nodes.iter().enumerate() {
@@ -149,9 +149,9 @@ impl AggIndexReader {
149149
all_columns_arrays.push(arrays);
150150
}
151151
if all_columns_arrays.is_empty() {
152-
return Ok(DataBlock::empty_with_schema(Arc::new(
152+
return Ok(vec![DataBlock::empty_with_schema(Arc::new(
153153
self.reader.data_schema(),
154-
)));
154+
))]);
155155
}
156156
debug_assert!(all_columns_arrays
157157
.iter()
@@ -167,8 +167,6 @@ impl AggIndexReader {
167167
let block = DataBlock::new_from_columns(columns);
168168
blocks.push(block);
169169
}
170-
let blocks = self.apply_agg_info(blocks)?;
171-
172-
DataBlock::concat(blocks.as_slice())
170+
self.apply_agg_info(blocks)
173171
}
174172
}

src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ pub struct NativeDeserializeDataTransform {
194194
// Structures for driving the pipeline:
195195
input: Arc<InputPort>,
196196
output: Arc<OutputPort>,
197-
output_data: Option<DataBlock>,
197+
output_data: Vec<DataBlock>,
198198
parts: VecDeque<PartInfoPtr>,
199199
columns: VecDeque<NativeDataSource>,
200200
scan_progress: Arc<Progress>,
@@ -369,7 +369,7 @@ impl NativeDeserializeDataTransform {
369369
block_reader,
370370
input,
371371
output,
372-
output_data: None,
372+
output_data: vec![],
373373
parts: VecDeque::new(),
374374
columns: VecDeque::new(),
375375
prewhere_columns,
@@ -417,7 +417,7 @@ impl NativeDeserializeDataTransform {
417417
};
418418
self.scan_progress.incr(&progress_values);
419419
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, data_block.memory_size());
420-
self.output_data = Some(data_block);
420+
self.output_data = vec![data_block];
421421
}
422422

423423
/// If the virtual column has been already generated, add it directly,
@@ -1049,7 +1049,7 @@ impl Processor for NativeDeserializeDataTransform {
10491049
return Ok(Event::NeedConsume);
10501050
}
10511051

1052-
if let Some(data_block) = self.output_data.take() {
1052+
if let Some(data_block) = self.output_data.pop() {
10531053
self.output.push_data(Ok(data_block));
10541054
return Ok(Event::NeedConsume);
10551055
}
@@ -1094,8 +1094,8 @@ impl Processor for NativeDeserializeDataTransform {
10941094
let columns = match columns {
10951095
NativeDataSource::AggIndex(data) => {
10961096
let agg_index_reader = self.index_reader.as_ref().as_ref().unwrap();
1097-
let block = agg_index_reader.deserialize_native_data(data)?;
1098-
self.output_data = Some(block);
1097+
let blocks = agg_index_reader.deserialize_native_data(data)?;
1098+
self.output_data = blocks;
10991099
self.finish_partition();
11001100
return Ok(());
11011101
}

src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,8 @@ impl Processor for DeserializeDataTransform {
225225
return Ok(Event::NeedConsume);
226226
}
227227

228-
while let Some(data_block) = self.output_data.pop() {
228+
if let Some(data_block) = self.output_data.pop() {
229229
self.output.push_data(Ok(data_block));
230-
if !self.output.can_push() {
231-
break;
232-
}
233230
return Ok(Event::NeedConsume);
234231
}
235232

0 commit comments

Comments
 (0)