Skip to content

Commit 4dad9fa

Browse files
committed
refactor: Enables the new setting for Query only
1 parent cc8cfdd commit 4dad9fa

File tree

8 files changed

+37
-28
lines changed

8 files changed

+37
-28
lines changed

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl AggIndexReader {
139139
}
140140
}
141141

142-
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<Vec<DataBlock>> {
142+
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<DataBlock> {
143143
let mut all_columns_arrays = vec![];
144144

145145
for (index, column_node) in self.reader.project_column_nodes.iter().enumerate() {
@@ -149,9 +149,9 @@ impl AggIndexReader {
149149
all_columns_arrays.push(arrays);
150150
}
151151
if all_columns_arrays.is_empty() {
152-
return Ok(vec![DataBlock::empty_with_schema(Arc::new(
152+
return Ok(DataBlock::empty_with_schema(Arc::new(
153153
self.reader.data_schema(),
154-
))]);
154+
)));
155155
}
156156
debug_assert!(all_columns_arrays
157157
.iter()
@@ -167,6 +167,7 @@ impl AggIndexReader {
167167
let block = DataBlock::new_from_columns(columns);
168168
blocks.push(block);
169169
}
170-
self.apply_agg_info(blocks)
170+
let block = DataBlock::concat(&blocks)?;
171+
self.apply_agg_info_to_block(block)
171172
}
172173
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,19 @@ impl AggIndexReader {
113113
&self,
114114
part: PartInfoPtr,
115115
data: BlockReadResult,
116-
batch_size: usize,
116+
batch_size_hint: Option<usize>,
117117
) -> Result<Vec<DataBlock>> {
118118
let columns_chunks = data.columns_chunks()?;
119119
let part = FuseBlockPartInfo::from_part(&part)?;
120-
let block = self.reader.deserialize_parquet_to_blocks(
120+
let blocks = self.reader.deserialize_parquet_to_blocks(
121121
part.nums_rows,
122122
&part.columns_meta,
123123
columns_chunks,
124124
&part.compression,
125125
&part.location,
126-
batch_size,
126+
batch_size_hint,
127127
)?;
128128

129-
self.apply_agg_info(block)
129+
self.apply_agg_info(blocks)
130130
}
131131
}

src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub fn column_chunks_to_record_batch(
3535
num_rows: usize,
3636
column_chunks: &HashMap<ColumnId, DataItem>,
3737
compression: &Compression,
38-
batch_size: usize,
38+
batch_size: Option<usize>,
3939
) -> databend_common_exception::Result<Vec<RecordBatch>> {
4040
let arrow_schema = Schema::from(original_schema);
4141
let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
@@ -68,6 +68,8 @@ pub fn column_chunks_to_record_batch(
6868
ProjectionMask::leaves(&parquet_schema, projection_mask),
6969
Some(arrow_schema.fields()),
7070
)?;
71+
72+
let batch_size = batch_size.unwrap_or(num_rows);
7173
let record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
7274
&field_levels,
7375
row_group.as_ref(),

src/query/storages/fuse/src/io/read/block/parquet/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl BlockReader {
5656
column_chunks,
5757
compression,
5858
block_path,
59-
num_rows,
59+
None,
6060
)?;
6161
// Defensive check: using `num_rows` as batch_size, expects only one block
6262
assert_eq!(blocks.len(), 1);
@@ -70,7 +70,7 @@ impl BlockReader {
7070
column_chunks: HashMap<ColumnId, DataItem>,
7171
compression: &Compression,
7272
block_path: &str,
73-
batch_size: usize,
73+
batch_size_hint: Option<usize>,
7474
) -> Result<Vec<DataBlock>> {
7575
if column_chunks.is_empty() {
7676
return Ok(vec![self.build_default_values_block(num_rows)?]);
@@ -81,7 +81,7 @@ impl BlockReader {
8181
num_rows,
8282
&column_chunks,
8383
compression,
84-
batch_size,
84+
batch_size_hint,
8585
)?;
8686

8787
let name_paths = column_name_paths(&self.projection, &self.original_schema);

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl VirtualColumnReader {
136136
pub fn try_create_paster(
137137
&self,
138138
virtual_data: Option<VirtualBlockReadResult>,
139-
batch_size: usize,
139+
batch_size_hint: Option<usize>,
140140
) -> Result<VirtualColumnDataPaster> {
141141
let record_batches = if let Some(virtual_data) = virtual_data {
142142
let columns_chunks = virtual_data.data.columns_chunks()?;
@@ -145,7 +145,7 @@ impl VirtualColumnReader {
145145
virtual_data.num_rows,
146146
&columns_chunks,
147147
&virtual_data.compression,
148-
batch_size,
148+
batch_size_hint,
149149
)?;
150150
Some(chunks)
151151
} else {

src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ impl MatchedAggregator {
201201
.insert(offset as usize)
202202
{
203203
return Err(ErrorCode::UnresolvableConflict(
204-
"multi rows from source match one and the same row in the target_table multi times",
204+
"1 multi rows from source match one and the same row in the target_table multi times",
205205
));
206206
}
207207
}
@@ -335,7 +335,7 @@ impl MatchedAggregator {
335335
< update_modified_offsets.len() + delete_modified_offsets.len()
336336
{
337337
return Err(ErrorCode::UnresolvableConflict(
338-
"multi rows from source match one and the same row in the target_table multi times",
338+
"2 multi rows from source match one and the same row in the target_table multi times",
339339
));
340340
}
341341

src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ pub struct NativeDeserializeDataTransform {
194194
// Structures for driving the pipeline:
195195
input: Arc<InputPort>,
196196
output: Arc<OutputPort>,
197-
output_data: Vec<DataBlock>,
197+
output_data: Option<DataBlock>,
198198
parts: VecDeque<PartInfoPtr>,
199199
columns: VecDeque<NativeDataSource>,
200200
scan_progress: Arc<Progress>,
@@ -369,7 +369,7 @@ impl NativeDeserializeDataTransform {
369369
block_reader,
370370
input,
371371
output,
372-
output_data: vec![],
372+
output_data: None,
373373
parts: VecDeque::new(),
374374
columns: VecDeque::new(),
375375
prewhere_columns,
@@ -417,7 +417,7 @@ impl NativeDeserializeDataTransform {
417417
};
418418
self.scan_progress.incr(&progress_values);
419419
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, data_block.memory_size());
420-
self.output_data = vec![data_block];
420+
self.output_data = Some(data_block);
421421
}
422422

423423
/// If the virtual column has been already generated, add it directly,
@@ -1049,7 +1049,7 @@ impl Processor for NativeDeserializeDataTransform {
10491049
return Ok(Event::NeedConsume);
10501050
}
10511051

1052-
if let Some(data_block) = self.output_data.pop() {
1052+
if let Some(data_block) = self.output_data.take() {
10531053
self.output.push_data(Ok(data_block));
10541054
return Ok(Event::NeedConsume);
10551055
}
@@ -1094,8 +1094,8 @@ impl Processor for NativeDeserializeDataTransform {
10941094
let columns = match columns {
10951095
NativeDataSource::AggIndex(data) => {
10961096
let agg_index_reader = self.index_reader.as_ref().as_ref().unwrap();
1097-
let blocks = agg_index_reader.deserialize_native_data(data)?;
1098-
self.output_data = blocks;
1097+
let block = agg_index_reader.deserialize_native_data(data)?;
1098+
self.output_data = Some(block);
10991099
self.finish_partition();
11001100
return Ok(());
11011101
}

src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_base::runtime::profile::Profile;
2323
use databend_common_base::runtime::profile::ProfileStatisticsName;
2424
use databend_common_catalog::plan::DataSourcePlan;
2525
use databend_common_catalog::plan::PartInfoPtr;
26+
use databend_common_catalog::query_kind::QueryKind;
2627
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
2728
use databend_common_catalog::table_context::TableContext;
2829
use databend_common_exception::ErrorCode;
@@ -80,7 +81,7 @@ pub struct DeserializeDataTransform {
8081
need_wait_runtime_filter: bool,
8182
runtime_filter_ready: Option<Arc<RuntimeFilterReady>>,
8283

83-
batch_size: usize,
84+
batch_size_hint: Option<usize>,
8485
}
8586

8687
unsafe impl Send for DeserializeDataTransform {}
@@ -99,6 +100,12 @@ impl DeserializeDataTransform {
99100
let need_wait_runtime_filter =
100101
!ctx.get_cluster().is_empty() && ctx.get_wait_runtime_filter(plan.scan_id);
101102

103+
// Unfortunately, batch size is hint is only safe for Query now.
104+
let batch_size_hint = match ctx.get_query_kind() {
105+
QueryKind::Query => Some(ctx.get_settings().get_fuse_parquet_read_batch_size()?),
106+
_ => None,
107+
};
108+
102109
let mut src_schema: DataSchema = (block_reader.schema().as_ref()).into();
103110
if let Some(virtual_reader) = virtual_reader.as_ref() {
104111
let mut fields = src_schema.fields().clone();
@@ -117,7 +124,6 @@ impl DeserializeDataTransform {
117124
let output_schema: DataSchema = (&output_schema).into();
118125
let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index);
119126

120-
let batch_size = ctx.get_settings().get_fuse_parquet_read_batch_size()?;
121127
Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform {
122128
ctx,
123129
table_index: plan.table_index,
@@ -138,7 +144,7 @@ impl DeserializeDataTransform {
138144
need_reserve_block_info,
139145
need_wait_runtime_filter,
140146
runtime_filter_ready: None,
141-
batch_size,
147+
batch_size_hint,
142148
})))
143149
}
144150

@@ -270,7 +276,7 @@ impl Processor for DeserializeDataTransform {
270276
let blocks = agg_index_reader.deserialize_parquet_data(
271277
actual_part,
272278
data,
273-
self.batch_size,
279+
self.batch_size_hint,
274280
)?;
275281

276282
self.update_scan_metrics(blocks.as_slice());
@@ -289,13 +295,13 @@ impl Processor for DeserializeDataTransform {
289295
columns_chunks,
290296
&part.compression,
291297
&part.location,
292-
self.batch_size,
298+
self.batch_size_hint,
293299
)?;
294300

295301
let mut virtual_columns_paster =
296302
if let Some(virtual_column_reader) = self.virtual_reader.as_ref() {
297303
let record_batches = virtual_column_reader
298-
.try_create_paster(virtual_data, self.batch_size)?;
304+
.try_create_paster(virtual_data, self.batch_size_hint)?;
299305
Some(record_batches)
300306
} else {
301307
None

0 commit comments

Comments
 (0)