Skip to content

Commit 2a7b27b

Browse files
committed
fix merge conflicts
1 parent 0d3a27c commit 2a7b27b

File tree

2 files changed

+49
-146
lines changed

2 files changed

+49
-146
lines changed

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

Lines changed: 37 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -149,100 +149,20 @@ impl VirtualColumnReader {
149149
))
150150
}
151151

152-
// pub fn deserialize_virtual_columns(
153-
// &self,
154-
// mut data_block: DataBlock,
155-
// virtual_data: Option<VirtualBlockReadResult>,
156-
//) -> Result<DataBlock> {
157-
// let orig_schema = virtual_data
158-
// .as_ref()
159-
// .map(|virtual_data| virtual_data.schema.clone())
160-
// .unwrap_or_default();
161-
// let record_batch = virtual_data
162-
// .map(|virtual_data| {
163-
// let columns_chunks = virtual_data.data.columns_chunks()?;
164-
// column_chunks_to_record_batch(
165-
// &virtual_data.schema,
166-
// virtual_data.num_rows,
167-
// &columns_chunks,
168-
// &virtual_data.compression,
169-
// )
170-
// })
171-
// .transpose()?;
172-
173-
// // If the virtual column has already generated, add it directly,
174-
// // otherwise extract it from the source column
175-
// let func_ctx = self.ctx.get_function_context()?;
176-
// for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() {
177-
// let name = format!("{}", virtual_column_field.column_id);
178-
// if let Some(arrow_array) = record_batch
179-
// .as_ref()
180-
// .and_then(|r| r.column_by_name(&name).cloned())
181-
// {
182-
// let orig_field = orig_schema.field_with_name(&name).unwrap();
183-
// let orig_type: DataType = orig_field.data_type().into();
184-
// let value = Value::Column(Column::from_arrow_rs(arrow_array, &orig_type)?);
185-
// let data_type: DataType = virtual_column_field.data_type.as_ref().into();
186-
// let column = if orig_type != data_type {
187-
// let cast_func_name = format!(
188-
// "to_{}",
189-
// data_type.remove_nullable().to_string().to_lowercase()
190-
// );
191-
// let (cast_value, cast_data_type) = eval_function(
192-
// None,
193-
// &cast_func_name,
194-
// [(value, orig_type)],
195-
// &func_ctx,
196-
// data_block.num_rows(),
197-
// &BUILTIN_FUNCTIONS,
198-
// )?;
199-
// BlockEntry::new(cast_data_type, cast_value)
200-
// } else {
201-
// BlockEntry::new(data_type, value)
202-
// };
203-
// data_block.add_column(column);
204-
// continue;
205-
// }
206-
// let src_index = self
207-
// .source_schema
208-
// .index_of(&virtual_column_field.source_name)
209-
// .unwrap();
210-
// let source = data_block.get_by_offset(src_index);
211-
// let src_arg = (source.value.clone(), source.data_type.clone());
212-
// let path_arg = (
213-
// Value::Scalar(virtual_column_field.key_paths.clone()),
214-
// DataType::String,
215-
// );
216-
217-
// let (value, data_type) = eval_function(
218-
// None,
219-
// "get_by_keypath",
220-
// [src_arg, path_arg],
221-
// &func_ctx,
222-
// data_block.num_rows(),
223-
// &BUILTIN_FUNCTIONS,
224-
// )?;
225-
226-
// let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name {
227-
// let (cast_value, cast_data_type) = eval_function(
228-
// None,
229-
// cast_func_name,
230-
// [(value, data_type)],
231-
// &func_ctx,
232-
// data_block.num_rows(),
233-
// &BUILTIN_FUNCTIONS,
234-
// )?;
235-
// BlockEntry::new(cast_data_type, cast_value)
236-
// } else {
237-
// BlockEntry::new(data_type, value)
238-
// };
239-
// data_block.add_column(column);
240-
// }
241-
242-
// Ok(data_block)
243-
//}
244-
/// Deserialize virtual column data into record batches, according to the `batch_size`.
245-
pub fn try_create_paster(
152+
/// Creates a VirtualColumnDataPaster that handles the integration of virtual column data into DataBlocks.
153+
///
154+
/// This method prepares a paster object that can process virtual column data from virtual block
155+
/// read result, and later merge this data into existing DataBlocks. It deserializes virtual
156+
/// column data into record batches according to the optional batch size hint.
157+
///
158+
/// # Arguments
159+
/// * `virtual_data` - Optional virtual block read result containing the data to be processed
160+
/// * `batch_size_hint` - Optional hint for controlling the size of generated record batches
161+
///
162+
/// # Returns
163+
/// * `Result<VirtualColumnDataPaster>` - A paster object that can merge virtual column data
164+
/// into DataBlocks, or an error if creation fails
165+
pub fn try_create_virtual_column_paster(
246166
&self,
247167
virtual_data: Option<VirtualBlockReadResult>,
248168
batch_size_hint: Option<usize>,
@@ -292,7 +212,29 @@ pub struct VirtualColumnDataPaster {
292212
}
293213

294214
impl VirtualColumnDataPaster {
295-
/// Paste virtual column to `data_block` if necessary
215+
/// Processes a DataBlock by adding virtual columns to it.
216+
///
217+
/// This method enriches the provided DataBlock with virtual columns by either:
218+
/// 1. Using pre-computed virtual column data from deserialized record batches if available
219+
/// 2. Computing virtual column values on-the-fly from source columns
220+
///
221+
/// For each virtual column field:
222+
/// - If the corresponding data exists in record batches, it is extracted and added directly
223+
/// - If not available in record batches, it is computed from source columns using path extraction
224+
/// - Type casting is performed if the source data type doesn't match the target virtual column type
225+
///
226+
/// The method tracks which record batch to use via an internal counter that advances with each call.
227+
///
228+
/// # Arguments
229+
/// * `data_block` - The input DataBlock to which virtual columns will be added
230+
///
231+
/// # Returns
232+
/// * `Result<DataBlock>` - The modified DataBlock containing the original columns plus virtual columns,
233+
/// or an error if the operation fails
234+
///
235+
/// # Note
236+
/// This method must be called sequentially for each data block. The internal state keeps track of
237+
/// which pre-computed record batch to use for each call.
296238
pub fn paste_virtual_column(&mut self, mut data_block: DataBlock) -> Result<DataBlock> {
297239
let record_batch = if let Some(record_batches) = &self.record_batches {
298240
assert!(record_batches.len() > self.next_record_batch_index);
@@ -374,52 +316,5 @@ impl VirtualColumnDataPaster {
374316
}
375317

376318
Ok(data_block)
377-
378-
// for virtual_column_field in self.virtual_column_fields.iter() {
379-
// if let Some(arrow_array) =
380-
// record_batch.and_then(|r| r.column_by_name(&virtual_column_field.name).cloned())
381-
// {
382-
// let data_type: DataType = virtual_column_field.data_type.as_ref().into();
383-
// let value = Value::Column(Column::from_arrow_rs(arrow_array, &data_type)?);
384-
// data_block.add_column(BlockEntry::new(data_type, value));
385-
// continue;
386-
// }
387-
// let src_index = self
388-
// .source_schema
389-
// .index_of(&virtual_column_field.source_name)
390-
// .unwrap();
391-
// let source = data_block.get_by_offset(src_index);
392-
// let src_arg = (source.value.clone(), source.data_type.clone());
393-
// let path_arg = (
394-
// Value::Scalar(virtual_column_field.key_paths.clone()),
395-
// DataType::String,
396-
// );
397-
398-
// let (value, data_type) = eval_function(
399-
// None,
400-
// "get_by_keypath",
401-
// [src_arg, path_arg],
402-
// func_ctx,
403-
// data_block.num_rows(),
404-
// &BUILTIN_FUNCTIONS,
405-
// )?;
406-
407-
// let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name {
408-
// let (cast_value, cast_data_type) = eval_function(
409-
// None,
410-
// cast_func_name,
411-
// [(value, data_type)],
412-
// func_ctx,
413-
// data_block.num_rows(),
414-
// &BUILTIN_FUNCTIONS,
415-
// )?;
416-
// BlockEntry::new(cast_data_type, cast_value)
417-
// } else {
418-
// BlockEntry::new(data_type, value)
419-
// };
420-
// data_block.add_column(column);
421-
//}
422-
423-
// Ok(data_block)
424319
}
425320
}

src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,11 +301,15 @@ impl Processor for DeserializeDataTransform {
301301
self.batch_size_hint,
302302
)?;
303303

304+
// Initialize virtual column paster if needed: which will add virtual columns to
305+
// each DataBlock. The paster is created from the VirtualColumnReader and maintains
306+
// internal state to track which record batch of virtual column data to use for each DataBlock.
304307
let mut virtual_columns_paster =
305308
if let Some(virtual_column_reader) = self.virtual_reader.as_ref() {
306-
let record_batches = virtual_column_reader
307-
.try_create_paster(virtual_data, self.batch_size_hint)?;
308-
Some(record_batches)
309+
Some(virtual_column_reader.try_create_virtual_column_paster(
310+
virtual_data,
311+
self.batch_size_hint,
312+
)?)
309313
} else {
310314
None
311315
};
@@ -331,7 +335,11 @@ impl Processor for DeserializeDataTransform {
331335
}
332336
}
333337

334-
// Add optional virtual columns
338+
// Process virtual columns if available: This step enriches the DataBlock
339+
// with virtual columns that were not originally present.
340+
// The paster was created earlier from the VirtualColumnReader and maintains
341+
// the state necessary to merge virtual columns into each data block in
342+
// sequence, ensuring each block gets the correct corresponding virtual data.
335343
if let Some(virtual_columns_paster) = &mut virtual_columns_paster {
336344
data_block = virtual_columns_paster.paste_virtual_column(data_block)?;
337345
}

0 commit comments

Comments
 (0)