Skip to content

Commit 1701de7

Browse files
committed
refactor: Enables the new setting for Query only
1 parent 210274d commit 1701de7

File tree

8 files changed

+233
-43
lines changed

8 files changed

+233
-43
lines changed

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl AggIndexReader {
139139
}
140140
}
141141

142-
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<Vec<DataBlock>> {
142+
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<DataBlock> {
143143
let mut all_columns_arrays = vec![];
144144

145145
for (index, column_node) in self.reader.project_column_nodes.iter().enumerate() {
@@ -149,9 +149,9 @@ impl AggIndexReader {
149149
all_columns_arrays.push(arrays);
150150
}
151151
if all_columns_arrays.is_empty() {
152-
return Ok(vec![DataBlock::empty_with_schema(Arc::new(
152+
return Ok(DataBlock::empty_with_schema(Arc::new(
153153
self.reader.data_schema(),
154-
))]);
154+
)));
155155
}
156156
debug_assert!(all_columns_arrays
157157
.iter()
@@ -167,6 +167,7 @@ impl AggIndexReader {
167167
let block = DataBlock::new_from_columns(columns);
168168
blocks.push(block);
169169
}
170-
self.apply_agg_info(blocks)
170+
let block = DataBlock::concat(&blocks)?;
171+
self.apply_agg_info_to_block(block)
171172
}
172173
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,19 @@ impl AggIndexReader {
113113
&self,
114114
part: PartInfoPtr,
115115
data: BlockReadResult,
116-
batch_size: usize,
116+
batch_size_hint: Option<usize>,
117117
) -> Result<Vec<DataBlock>> {
118118
let columns_chunks = data.columns_chunks()?;
119119
let part = FuseBlockPartInfo::from_part(&part)?;
120-
let block = self.reader.deserialize_parquet_to_blocks(
120+
let blocks = self.reader.deserialize_parquet_to_blocks(
121121
part.nums_rows,
122122
&part.columns_meta,
123123
columns_chunks,
124124
&part.compression,
125125
&part.location,
126-
batch_size,
126+
batch_size_hint,
127127
)?;
128128

129-
self.apply_agg_info(block)
129+
self.apply_agg_info(blocks)
130130
}
131131
}

src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub fn column_chunks_to_record_batch(
3535
num_rows: usize,
3636
column_chunks: &HashMap<ColumnId, DataItem>,
3737
compression: &Compression,
38-
batch_size: usize,
38+
batch_size: Option<usize>,
3939
) -> databend_common_exception::Result<Vec<RecordBatch>> {
4040
let arrow_schema = Schema::from(original_schema);
4141
let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
@@ -68,6 +68,8 @@ pub fn column_chunks_to_record_batch(
6868
ProjectionMask::leaves(&parquet_schema, projection_mask),
6969
Some(arrow_schema.fields()),
7070
)?;
71+
72+
let batch_size = batch_size.unwrap_or(num_rows);
7173
let record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
7274
&field_levels,
7375
row_group.as_ref(),

src/query/storages/fuse/src/io/read/block/parquet/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl BlockReader {
5656
column_chunks,
5757
compression,
5858
block_path,
59-
num_rows,
59+
None,
6060
)?;
6161
// Defensive check: using `num_rows` as batch_size, expects only one block
6262
assert_eq!(blocks.len(), 1);
@@ -70,7 +70,7 @@ impl BlockReader {
7070
column_chunks: HashMap<ColumnId, DataItem>,
7171
compression: &Compression,
7272
block_path: &str,
73-
batch_size: usize,
73+
batch_size_hint: Option<usize>,
7474
) -> Result<Vec<DataBlock>> {
7575
if column_chunks.is_empty() {
7676
return Ok(vec![self.build_default_values_block(num_rows)?]);
@@ -81,7 +81,7 @@ impl BlockReader {
8181
num_rows,
8282
&column_chunks,
8383
compression,
84-
batch_size,
84+
batch_size_hint,
8585
)?;
8686

8787
let name_paths = column_name_paths(&self.projection, &self.original_schema);

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

Lines changed: 198 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -149,37 +149,171 @@ impl VirtualColumnReader {
149149
))
150150
}
151151

152-
pub fn deserialize_virtual_columns(
152+
// pub fn deserialize_virtual_columns(
153+
// &self,
154+
// mut data_block: DataBlock,
155+
// virtual_data: Option<VirtualBlockReadResult>,
156+
//) -> Result<DataBlock> {
157+
// let orig_schema = virtual_data
158+
// .as_ref()
159+
// .map(|virtual_data| virtual_data.schema.clone())
160+
// .unwrap_or_default();
161+
// let record_batch = virtual_data
162+
// .map(|virtual_data| {
163+
// let columns_chunks = virtual_data.data.columns_chunks()?;
164+
// column_chunks_to_record_batch(
165+
// &virtual_data.schema,
166+
// virtual_data.num_rows,
167+
// &columns_chunks,
168+
// &virtual_data.compression,
169+
// )
170+
// })
171+
// .transpose()?;
172+
173+
// // If the virtual column has already generated, add it directly,
174+
// // otherwise extract it from the source column
175+
// let func_ctx = self.ctx.get_function_context()?;
176+
// for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() {
177+
// let name = format!("{}", virtual_column_field.column_id);
178+
// if let Some(arrow_array) = record_batch
179+
// .as_ref()
180+
// .and_then(|r| r.column_by_name(&name).cloned())
181+
// {
182+
// let orig_field = orig_schema.field_with_name(&name).unwrap();
183+
// let orig_type: DataType = orig_field.data_type().into();
184+
// let value = Value::Column(Column::from_arrow_rs(arrow_array, &orig_type)?);
185+
// let data_type: DataType = virtual_column_field.data_type.as_ref().into();
186+
// let column = if orig_type != data_type {
187+
// let cast_func_name = format!(
188+
// "to_{}",
189+
// data_type.remove_nullable().to_string().to_lowercase()
190+
// );
191+
// let (cast_value, cast_data_type) = eval_function(
192+
// None,
193+
// &cast_func_name,
194+
// [(value, orig_type)],
195+
// &func_ctx,
196+
// data_block.num_rows(),
197+
// &BUILTIN_FUNCTIONS,
198+
// )?;
199+
// BlockEntry::new(cast_data_type, cast_value)
200+
// } else {
201+
// BlockEntry::new(data_type, value)
202+
// };
203+
// data_block.add_column(column);
204+
// continue;
205+
// }
206+
// let src_index = self
207+
// .source_schema
208+
// .index_of(&virtual_column_field.source_name)
209+
// .unwrap();
210+
// let source = data_block.get_by_offset(src_index);
211+
// let src_arg = (source.value.clone(), source.data_type.clone());
212+
// let path_arg = (
213+
// Value::Scalar(virtual_column_field.key_paths.clone()),
214+
// DataType::String,
215+
// );
216+
217+
// let (value, data_type) = eval_function(
218+
// None,
219+
// "get_by_keypath",
220+
// [src_arg, path_arg],
221+
// &func_ctx,
222+
// data_block.num_rows(),
223+
// &BUILTIN_FUNCTIONS,
224+
// )?;
225+
226+
// let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name {
227+
// let (cast_value, cast_data_type) = eval_function(
228+
// None,
229+
// cast_func_name,
230+
// [(value, data_type)],
231+
// &func_ctx,
232+
// data_block.num_rows(),
233+
// &BUILTIN_FUNCTIONS,
234+
// )?;
235+
// BlockEntry::new(cast_data_type, cast_value)
236+
// } else {
237+
// BlockEntry::new(data_type, value)
238+
// };
239+
// data_block.add_column(column);
240+
// }
241+
242+
// Ok(data_block)
243+
//}
244+
/// Deserialize virtual column data into record batches, according to the `batch_size`.
245+
pub fn try_create_paster(
153246
&self,
154-
mut data_block: DataBlock,
155247
virtual_data: Option<VirtualBlockReadResult>,
156-
) -> Result<DataBlock> {
248+
batch_size_hint: Option<usize>,
249+
) -> Result<VirtualColumnDataPaster> {
157250
let orig_schema = virtual_data
158251
.as_ref()
159252
.map(|virtual_data| virtual_data.schema.clone())
160253
.unwrap_or_default();
161-
let record_batch = virtual_data
162-
.map(|virtual_data| {
163-
let columns_chunks = virtual_data.data.columns_chunks()?;
164-
column_chunks_to_record_batch(
165-
&virtual_data.schema,
166-
virtual_data.num_rows,
167-
&columns_chunks,
168-
&virtual_data.compression,
169-
)
170-
})
171-
.transpose()?;
254+
255+
let record_batches = if let Some(virtual_data) = virtual_data {
256+
let columns_chunks = virtual_data.data.columns_chunks()?;
257+
let chunks = column_chunks_to_record_batch(
258+
&self.virtual_column_info.schema,
259+
virtual_data.num_rows,
260+
&columns_chunks,
261+
&virtual_data.compression,
262+
batch_size_hint,
263+
)?;
264+
Some(chunks)
265+
} else {
266+
None
267+
};
268+
269+
let function_context = self.ctx.get_function_context()?;
270+
271+
// Unfortunately, Paster cannot hold references to the fields that being cloned,
272+
// since the caller `DeserializeDataTransform` will take mutable reference of
273+
// VirtualColumnReader indirectly.
274+
Ok(VirtualColumnDataPaster {
275+
record_batches,
276+
function_context,
277+
next_record_batch_index: 0,
278+
virtual_column_fields: self.virtual_column_info.virtual_column_fields.clone(),
279+
source_schema: self.source_schema.clone(),
280+
orig_schema,
281+
})
282+
}
283+
}
284+
285+
pub struct VirtualColumnDataPaster {
286+
record_batches: Option<Vec<RecordBatch>>,
287+
next_record_batch_index: usize,
288+
function_context: FunctionContext,
289+
virtual_column_fields: Vec<VirtualColumnField>,
290+
source_schema: TableSchemaRef,
291+
orig_schema: TableSchemaRef,
292+
}
293+
294+
impl VirtualColumnDataPaster {
295+
/// Paste virtual column to `data_block` if necessary
296+
pub fn paste_virtual_column(&mut self, mut data_block: DataBlock) -> Result<DataBlock> {
297+
let record_batch = if let Some(record_batches) = &self.record_batches {
298+
assert!(record_batches.len() > self.next_record_batch_index);
299+
Some(&record_batches[self.next_record_batch_index])
300+
} else {
301+
None
302+
};
303+
304+
self.next_record_batch_index += 1;
305+
306+
let func_ctx = &self.function_context;
172307

173308
// If the virtual column has already generated, add it directly,
174309
// otherwise extract it from the source column
175-
let func_ctx = self.ctx.get_function_context()?;
176-
for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() {
310+
for virtual_column_field in self.virtual_column_fields.iter() {
177311
let name = format!("{}", virtual_column_field.column_id);
178312
if let Some(arrow_array) = record_batch
179313
.as_ref()
180314
.and_then(|r| r.column_by_name(&name).cloned())
181315
{
182-
let orig_field = orig_schema.field_with_name(&name).unwrap();
316+
let orig_field = self.orig_schema.field_with_name(&name).unwrap();
183317
let orig_type: DataType = orig_field.data_type().into();
184318
let value = Value::Column(Column::from_arrow_rs(arrow_array, &orig_type)?);
185319
let data_type: DataType = virtual_column_field.data_type.as_ref().into();
@@ -240,5 +374,52 @@ impl VirtualColumnReader {
240374
}
241375

242376
Ok(data_block)
377+
378+
// for virtual_column_field in self.virtual_column_fields.iter() {
379+
// if let Some(arrow_array) =
380+
// record_batch.and_then(|r| r.column_by_name(&virtual_column_field.name).cloned())
381+
// {
382+
// let data_type: DataType = virtual_column_field.data_type.as_ref().into();
383+
// let value = Value::Column(Column::from_arrow_rs(arrow_array, &data_type)?);
384+
// data_block.add_column(BlockEntry::new(data_type, value));
385+
// continue;
386+
// }
387+
// let src_index = self
388+
// .source_schema
389+
// .index_of(&virtual_column_field.source_name)
390+
// .unwrap();
391+
// let source = data_block.get_by_offset(src_index);
392+
// let src_arg = (source.value.clone(), source.data_type.clone());
393+
// let path_arg = (
394+
// Value::Scalar(virtual_column_field.key_paths.clone()),
395+
// DataType::String,
396+
// );
397+
398+
// let (value, data_type) = eval_function(
399+
// None,
400+
// "get_by_keypath",
401+
// [src_arg, path_arg],
402+
// func_ctx,
403+
// data_block.num_rows(),
404+
// &BUILTIN_FUNCTIONS,
405+
// )?;
406+
407+
// let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name {
408+
// let (cast_value, cast_data_type) = eval_function(
409+
// None,
410+
// cast_func_name,
411+
// [(value, data_type)],
412+
// func_ctx,
413+
// data_block.num_rows(),
414+
// &BUILTIN_FUNCTIONS,
415+
// )?;
416+
// BlockEntry::new(cast_data_type, cast_value)
417+
// } else {
418+
// BlockEntry::new(data_type, value)
419+
// };
420+
// data_block.add_column(column);
421+
//}
422+
423+
// Ok(data_block)
243424
}
244425
}

src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ impl MatchedAggregator {
201201
.insert(offset as usize)
202202
{
203203
return Err(ErrorCode::UnresolvableConflict(
204-
"multi rows from source match one and the same row in the target_table multi times",
204+
"1 multi rows from source match one and the same row in the target_table multi times",
205205
));
206206
}
207207
}
@@ -335,7 +335,7 @@ impl MatchedAggregator {
335335
< update_modified_offsets.len() + delete_modified_offsets.len()
336336
{
337337
return Err(ErrorCode::UnresolvableConflict(
338-
"multi rows from source match one and the same row in the target_table multi times",
338+
"2 multi rows from source match one and the same row in the target_table multi times",
339339
));
340340
}
341341

0 commit comments

Comments
 (0)