diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index fced5a435ac8f..f50c787d993b0 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1333,6 +1333,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("fuse_parquet_read_batch_size", DefaultSettingValue { + value: UserSettingValue::UInt64(8192), + desc: "The batch size while deserializing fuse table with parquet storage format", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(1..=1_000_000)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 97468f1d0102c..11661dcd0d844 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -983,4 +983,8 @@ impl Settings { pub fn get_max_aggregate_restore_worker(&self) -> Result { self.try_get_u64("max_aggregate_restore_worker") } + + pub fn get_fuse_parquet_read_batch_size(&self) -> Result { + Ok(self.try_get_u64("fuse_parquet_read_batch_size")? as usize) + } } diff --git a/src/query/storages/common/cache/src/providers/disk_cache_builder.rs b/src/query/storages/common/cache/src/providers/disk_cache_builder.rs index 1b2ec1f0b2837..da64353cb236e 100644 --- a/src/query/storages/common/cache/src/providers/disk_cache_builder.rs +++ b/src/query/storages/common/cache/src/providers/disk_cache_builder.rs @@ -36,7 +36,7 @@ struct CacheItem { value: Bytes, } -#[derive(Clone)] +#[derive(Clone, Eq, PartialEq, Hash)] pub struct TableDataCacheKey { cache_key: String, } diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs index 124b0d4478493..3d7503781184c 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs @@ -97,7 +97,7 @@ impl AggIndexReader { self.index_id } - pub(super) fn apply_agg_info(&self, block: DataBlock) -> Result { + pub(super) fn apply_agg_info_to_block(&self, block: DataBlock) -> Result { let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS); // 1. Filter the block if there is a filter. @@ -145,4 +145,11 @@ impl AggIndexReader { )), )) } + + pub(super) fn apply_agg_info(&self, block: Vec) -> Result> { + block + .into_iter() + .map(|block| self.apply_agg_info_to_block(block)) + .collect::>() + } } diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs index 8a76a72e7242d..fb44116705a03 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::vec; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -167,6 +168,6 @@ impl AggIndexReader { blocks.push(block); } let block = DataBlock::concat(&blocks)?; - self.apply_agg_info(block) + self.apply_agg_info_to_block(block) } } diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs index be2f4e3af0e24..51a5c251eed33 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs @@ -113,17 +113,19 @@ impl AggIndexReader { &self, part: PartInfoPtr, data: BlockReadResult, - ) -> Result { + batch_size_hint: Option, + ) -> Result> { let columns_chunks = data.columns_chunks()?; let part = FuseBlockPartInfo::from_part(&part)?; - let block = self.reader.deserialize_parquet_chunks( + let blocks = self.reader.deserialize_parquet_to_blocks( part.nums_rows, &part.columns_meta, columns_chunks, &part.compression, &part.location, + batch_size_hint, )?; - self.apply_agg_info(block) + self.apply_agg_info(blocks) } } diff --git a/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs b/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs index 22907c2d03210..c5c2f4744c50c 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs @@ -19,6 +19,7 @@ use arrow_schema::Schema; use databend_common_expression::ColumnId; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::Compression; +use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::parquet_to_arrow_field_levels; use parquet::arrow::ArrowSchemaConverter; @@ -34,7 +35,8 @@ pub fn column_chunks_to_record_batch( num_rows: usize, column_chunks: &HashMap, compression: &Compression, -) -> databend_common_exception::Result { + batch_size: Option, +) -> databend_common_exception::Result> { let arrow_schema = Schema::from(original_schema); let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?; @@ -66,13 +68,19 @@ pub fn column_chunks_to_record_batch( ProjectionMask::leaves(&parquet_schema, projection_mask), Some(arrow_schema.fields()), )?; - let mut record_reader = ParquetRecordBatchReader::try_new_with_row_groups( + + let batch_size = batch_size.unwrap_or(num_rows); + let record_reader = ParquetRecordBatchReader::try_new_with_row_groups( &field_levels, row_group.as_ref(), - num_rows, + batch_size, None, )?; - let record = record_reader.next().unwrap()?; - assert!(record_reader.next().is_none()); - Ok(record) + + let records: Vec<_> = record_reader.try_collect()?; + assert_eq!( + num_rows, + records.iter().map(|r| r.num_rows()).sum::() + ); + Ok(records) } diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index bd7ec4a71dbaa..71b75bcbbd9ae 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use arrow_array::Array; use arrow_array::ArrayRef; use arrow_array::RecordBatch; use arrow_array::StructArray; @@ -35,6 +36,7 @@ mod adapter; mod deserialize; pub use adapter::RowGroupImplBuilder; +use databend_common_exception::Result; pub use deserialize::column_chunks_to_record_batch; use crate::io::read::block::block_reader_merge_io::DataItem; @@ -48,17 +50,41 @@ impl BlockReader { column_chunks: HashMap, compression: &Compression, block_path: &str, - ) -> databend_common_exception::Result { + ) -> Result { + let mut blocks = self.deserialize_parquet_to_blocks( + num_rows, + column_metas, + column_chunks, + compression, + block_path, + None, + )?; + // Defensive check: using `num_rows` as batch_size, expects only one block + assert_eq!(blocks.len(), 1); + Ok(blocks.pop().unwrap()) + } + + pub(crate) fn deserialize_parquet_to_blocks( + &self, + num_rows: usize, + column_metas: &HashMap, + column_chunks: HashMap, + compression: &Compression, + block_path: &str, + batch_size_hint: Option, + ) -> Result> { if column_chunks.is_empty() { - return self.build_default_values_block(num_rows); + return Ok(vec![self.build_default_values_block(num_rows)?]); } - let record_batch = column_chunks_to_record_batch( + + let record_batches = column_chunks_to_record_batch( &self.original_schema, num_rows, &column_chunks, compression, + batch_size_hint, )?; - let mut columns = Vec::with_capacity(self.projected_schema.fields.len()); + let name_paths = column_name_paths(&self.projection, &self.original_schema); let array_cache = if self.put_cache { @@ -67,58 +93,89 @@ impl BlockReader { None }; - for ((i, field), column_node) in self - .projected_schema - .fields - .iter() - .enumerate() - .zip(self.project_column_nodes.iter()) - { - let data_type = field.data_type().into(); - - // NOTE, there is something tricky here: - // - `column_chunks` always contains data of leaf columns - // - here we may processing a nested type field - // - But, even if the field being processed is a field with multiple leaf columns - // `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1], - // even if we are getting data from `column_chunks` using a non-leaf - // `column_id` of `projected_schema.fields` - // - // [^1]: Except in the current block, there is no data stored for the - // corresponding field, and a default value has been declared for - // the corresponding field. - // - // Yes, it is too obscure, we need to polish it later. - - let value = match column_chunks.get(&field.column_id) { - Some(DataItem::RawData(data)) => { - // get the deserialized arrow array, which may be a nested array - let arrow_array = column_by_name(&record_batch, &name_paths[i]); - if !column_node.is_nested { - if let Some(cache) = &array_cache { + let mut blocks = Vec::with_capacity(record_batches.len()); + let mut array_cache_buffer = HashMap::with_capacity(record_batches.len()); + + let mut offset = 0; + for record_batch in record_batches { + let num_rows_record_batch = record_batch.num_rows(); + let mut columns = Vec::with_capacity(self.projected_schema.fields.len()); + for ((i, field), column_node) in self + .projected_schema + .fields + .iter() + .enumerate() + .zip(self.project_column_nodes.iter()) + { + let data_type = field.data_type().into(); + + // NOTE, there is something tricky here: + // - `column_chunks` always contains data of leaf columns + // - here we may processing a nested type field + // - But, even if the field being processed is a field with multiple leaf columns + // `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1], + // even if we are getting data from `column_chunks` using a non-leaf + // `column_id` of `projected_schema.fields` + // + // [^1]: Except in the current block, there is no data stored for the + // corresponding field, and a default value has been declared for + // the corresponding field. + // + // It is too confusing, we need to polish it SOON. + + let value = match column_chunks.get(&field.column_id) { + Some(DataItem::RawData(data)) => { + // get the deserialized arrow array, which may be a nested array + let arrow_array = column_by_name(&record_batch, &name_paths[i]); + if !column_node.is_nested && array_cache.is_some() { let meta = column_metas.get(&field.column_id).unwrap(); let (offset, len) = meta.offset_length(); let key = TableDataCacheKey::new(block_path, field.column_id, offset, len); - cache.insert(key.into(), (arrow_array.clone(), data.len())); + array_cache_buffer + .entry(key) + .and_modify(|v: &mut Vec<_>| { + v.push((arrow_array.clone(), data.len())) + }) + .or_insert(vec![(arrow_array.clone(), data.len())]); } + Value::from_arrow_rs(arrow_array, &data_type)? } - Value::from_arrow_rs(arrow_array, &data_type)? - } - Some(DataItem::ColumnArray(cached)) => { - if column_node.is_nested { - // a defensive check, should never happen - return Err(ErrorCode::StorageOther( - "unexpected nested field: nested leaf field hits cached", - )); + Some(DataItem::ColumnArray(cached)) => { + if column_node.is_nested { + // a defensive check, should never happen + return Err(ErrorCode::StorageOther( + "unexpected nested field: nested leaf field hits cached", + )); + } + let array = cached.0.slice(offset, record_batch.num_rows()); + Value::from_arrow_rs(array, &data_type)? } - Value::from_arrow_rs(cached.0.clone(), &data_type)? + None => Value::Scalar(self.default_vals[i].clone()), + }; + columns.push(BlockEntry::new(data_type, value)); + } + + offset += record_batch.num_rows(); + blocks.push(DataBlock::new(columns, num_rows_record_batch)); + } + + // TODO doc this + if let Some(array_cache) = &array_cache { + for (key, items) in array_cache_buffer { + let mut arrays = Vec::with_capacity(items.len()); + let mut len = 0; + for (array, size) in &items { + arrays.push(array.as_ref()); + len += size; } - None => Value::Scalar(self.default_vals[i].clone()), - }; - columns.push(BlockEntry::new(data_type, value)); + use arrow::compute::concat; + let result = concat(&arrays)?; + array_cache.insert(key.into(), (result, len)); + } } - Ok(DataBlock::new(columns, num_rows)) + + Ok(blocks) } } diff --git a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs index 1eb508ad38297..e3067636e38f1 100644 --- a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs @@ -15,6 +15,8 @@ use std::collections::HashSet; use std::sync::Arc; +use arrow_array::RecordBatch; +use databend_common_catalog::plan::VirtualColumnField; use databend_common_exception::Result; use databend_common_expression::eval_function; use databend_common_expression::types::DataType; @@ -22,6 +24,7 @@ use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; use databend_common_expression::Value; @@ -146,37 +149,113 @@ impl VirtualColumnReader { )) } - pub fn deserialize_virtual_columns( + /// Creates a VirtualColumnDataPaster that handles the integration of virtual column data into DataBlocks. + /// + /// This method prepares a paster object that can process virtual column data from virtual block + /// read result, and later merge this data into existing DataBlocks. It deserializes virtual + /// column data into record batches according to the optional batch size hint. + /// + /// # Arguments + /// * `virtual_data` - Optional virtual block read result containing the data to be processed + /// * `batch_size_hint` - Optional hint for controlling the size of generated record batches + /// + /// # Returns + /// * `Result` - A paster object that can merge virtual column data + /// into DataBlocks, or an error if creation fails + pub fn try_create_virtual_column_paster( &self, - mut data_block: DataBlock, virtual_data: Option, - ) -> Result { + batch_size_hint: Option, + ) -> Result { let orig_schema = virtual_data .as_ref() .map(|virtual_data| virtual_data.schema.clone()) .unwrap_or_default(); - let record_batch = virtual_data - .map(|virtual_data| { - let columns_chunks = virtual_data.data.columns_chunks()?; - column_chunks_to_record_batch( - &virtual_data.schema, - virtual_data.num_rows, - &columns_chunks, - &virtual_data.compression, - ) - }) - .transpose()?; + + let record_batches = if let Some(virtual_data) = virtual_data { + let columns_chunks = virtual_data.data.columns_chunks()?; + let chunks = column_chunks_to_record_batch( + &virtual_data.schema, + virtual_data.num_rows, + &columns_chunks, + &virtual_data.compression, + batch_size_hint, + )?; + Some(chunks) + } else { + None + }; + + let function_context = self.ctx.get_function_context()?; + + // Unfortunately, Paster cannot hold references to the fields that being cloned, + // since the caller `DeserializeDataTransform` will take mutable reference of + // VirtualColumnReader indirectly. + Ok(VirtualColumnDataPaster { + record_batches, + function_context, + next_record_batch_index: 0, + virtual_column_fields: self.virtual_column_info.virtual_column_fields.clone(), + source_schema: self.source_schema.clone(), + orig_schema, + }) + } +} + +pub struct VirtualColumnDataPaster { + record_batches: Option>, + next_record_batch_index: usize, + function_context: FunctionContext, + virtual_column_fields: Vec, + source_schema: TableSchemaRef, + orig_schema: TableSchemaRef, +} + +impl VirtualColumnDataPaster { + /// Processes a DataBlock by adding virtual columns to it. + /// + /// This method enriches the provided DataBlock with virtual columns by either: + /// 1. Using pre-computed virtual column data from deserialized record batches if available + /// 2. Computing virtual column values on-the-fly from source columns + /// + /// For each virtual column field: + /// - If the corresponding data exists in record batches, it is extracted and added directly + /// - If not available in record batches, it is computed from source columns using path extraction + /// - Type casting is performed if the source data type doesn't match the target virtual column type + /// + /// The method tracks which record batch to use via an internal counter that advances with each call. + /// + /// # Arguments + /// * `data_block` - The input DataBlock to which virtual columns will be added + /// + /// # Returns + /// * `Result` - The modified DataBlock containing the original columns plus virtual columns, + /// or an error if the operation fails + /// + /// # Note + /// This method must be called sequentially for each data block. The internal state keeps track of + /// which pre-computed record batch to use for each call. + pub fn paste_virtual_column(&mut self, mut data_block: DataBlock) -> Result { + let record_batch = if let Some(record_batches) = &self.record_batches { + assert!(record_batches.len() > self.next_record_batch_index); + Some(&record_batches[self.next_record_batch_index]) + } else { + None + }; + + self.next_record_batch_index += 1; + + let func_ctx = &self.function_context; // If the virtual column has already generated, add it directly, // otherwise extract it from the source column - let func_ctx = self.ctx.get_function_context()?; - for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() { + for virtual_column_field in self.virtual_column_fields.iter() { let name = format!("{}", virtual_column_field.column_id); if let Some(arrow_array) = record_batch .as_ref() .and_then(|r| r.column_by_name(&name).cloned()) { - let orig_field = orig_schema.field_with_name(&name).unwrap(); + let orig_field = self.orig_schema.field_with_name(&name).unwrap(); let orig_type: DataType = orig_field.data_type().into(); let value = Value::Column(Column::from_arrow_rs(arrow_array, &orig_type)?); let data_type: DataType = virtual_column_field.data_type.as_ref().into(); @@ -189,7 +268,7 @@ impl VirtualColumnReader { None, &cast_func_name, [(value, orig_type)], - &func_ctx, + func_ctx, data_block.num_rows(), &BUILTIN_FUNCTIONS, )?; @@ -215,7 +294,7 @@ impl VirtualColumnReader { None, "get_by_keypath", [src_arg, path_arg], - &func_ctx, + func_ctx, data_block.num_rows(), &BUILTIN_FUNCTIONS, )?; @@ -225,7 +304,7 @@ impl VirtualColumnReader { None, cast_func_name, [(value, data_type)], - &func_ctx, + func_ctx, data_block.num_rows(), &BUILTIN_FUNCTIONS, )?; diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index 1add3998f2b4c..36f0aab489778 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::collections::VecDeque; use std::ops::BitAnd; use std::sync::Arc; use std::time::Instant; @@ -23,6 +24,7 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -64,7 +66,7 @@ pub struct DeserializeDataTransform { input: Arc, output: Arc, - output_data: Option, + output_data: VecDeque, src_schema: DataSchema, output_schema: DataSchema, parts: Vec, @@ -79,6 +81,8 @@ pub struct DeserializeDataTransform { need_reserve_block_info: bool, need_wait_runtime_filter: bool, runtime_filter_ready: Option>, + + batch_size_hint: Option, } unsafe impl Send for DeserializeDataTransform {} @@ -97,6 +101,12 @@ impl DeserializeDataTransform { let need_wait_runtime_filter = !ctx.get_cluster().is_empty() && ctx.get_wait_runtime_filter(plan.scan_id); + // Unfortunately, the batch size hint is only safe for Query now. + let batch_size_hint = match ctx.get_query_kind() { + QueryKind::Query => Some(ctx.get_settings().get_fuse_parquet_read_batch_size()?), + _ => None, + }; + let mut src_schema: DataSchema = (block_reader.schema().as_ref()).into(); if let Some(virtual_reader) = virtual_reader.as_ref() { let mut fields = src_schema.fields().clone(); @@ -114,6 +124,7 @@ impl DeserializeDataTransform { output_schema.remove_internal_fields(); let output_schema: DataSchema = (&output_schema).into(); let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); + Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform { ctx, table_index: plan.table_index, @@ -122,7 +133,7 @@ impl DeserializeDataTransform { block_reader, input, output, - output_data: None, + output_data: VecDeque::new(), src_schema, output_schema, parts: vec![], @@ -134,10 +145,11 @@ impl DeserializeDataTransform { need_reserve_block_info, need_wait_runtime_filter, runtime_filter_ready: None, + batch_size_hint, }))) } - fn runtime_filter(&mut self, data_block: DataBlock) -> Result> { + fn runtime_filter(&mut self, data_block: &DataBlock) -> Result> { // Check if already cached runtime filters if self.cached_runtime_filter.is_none() { let bloom_filters = self.ctx.get_bloom_runtime_filter_with_id(self.table_index); @@ -222,7 +234,7 @@ impl Processor for DeserializeDataTransform { return Ok(Event::NeedConsume); } - if let Some(data_block) = self.output_data.take() { + if let Some(data_block) = self.output_data.pop_front() { self.output.push_data(Ok(data_block)); return Ok(Event::NeedConsume); } @@ -264,48 +276,43 @@ impl Processor for DeserializeDataTransform { match read_res { ParquetDataSource::AggIndex((actual_part, data)) => { let agg_index_reader = self.index_reader.as_ref().as_ref().unwrap(); - let block = agg_index_reader.deserialize_parquet_data(actual_part, data)?; - - let progress_values = ProgressValues { - rows: block.num_rows(), - bytes: block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - Profile::record_usize_profile( - ProfileStatisticsName::ScanBytes, - block.memory_size(), - ); - - self.output_data = Some(block); + let blocks = agg_index_reader.deserialize_parquet_data( + actual_part, + data, + self.batch_size_hint, + )?; + + self.update_scan_metrics(blocks.as_slice()); + + self.output_data = blocks.into(); } + ParquetDataSource::Normal((data, virtual_data)) => { let start = Instant::now(); let columns_chunks = data.columns_chunks()?; let part = FuseBlockPartInfo::from_part(&part)?; - let mut data_block = self.block_reader.deserialize_parquet_chunks( + let data_blocks = self.block_reader.deserialize_parquet_to_blocks( part.nums_rows, &part.columns_meta, columns_chunks, &part.compression, &part.location, + self.batch_size_hint, )?; - let origin_num_rows = data_block.num_rows(); - - let mut filter = None; - if self.ctx.has_bloom_runtime_filters(self.table_index) { - if let Some(bitmap) = self.runtime_filter(data_block.clone())? { - data_block = data_block.filter_with_bitmap(&bitmap)?; - filter = Some(bitmap); - } - } - - // Add optional virtual columns - if let Some(virtual_reader) = self.virtual_reader.as_ref() { - data_block = virtual_reader - .deserialize_virtual_columns(data_block.clone(), virtual_data)?; - } + // Initialize virtual column paster if needed: which will add virtual columns to + // each DataBlock. The paster is created from the VirtualColumnReader and maintains + // internal state to track which record batch of virtual column data to use for each DataBlock. + let mut virtual_columns_paster = + if let Some(virtual_column_reader) = self.virtual_reader.as_ref() { + Some(virtual_column_reader.try_create_virtual_column_paster( + virtual_data, + self.batch_size_hint, + )?) + } else { + None + }; // Perf. { @@ -314,42 +321,57 @@ impl Processor for DeserializeDataTransform { ); } - let progress_values = ProgressValues { - rows: data_block.num_rows(), - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - Profile::record_usize_profile( - ProfileStatisticsName::ScanBytes, - data_block.memory_size(), - ); - - let mut data_block = - data_block.resort(&self.src_schema, &self.output_schema)?; - - // Fill `BlockMetaIndex` as `DataBlock.meta` if query internal columns, - // `TransformAddInternalColumns` will generate internal columns using `BlockMetaIndex` in next pipeline. - let offsets = if self.block_reader.query_internal_columns() { - filter.as_ref().map(|bitmap| { - (0..origin_num_rows) - .filter(|i| unsafe { bitmap.get_bit_unchecked(*i) }) - .collect() - }) - } else { - None - }; - - data_block = add_data_block_meta( - data_block, - part, - offsets, - self.base_block_ids.clone(), - self.block_reader.update_stream_columns(), - self.block_reader.query_internal_columns(), - self.need_reserve_block_info, - )?; + self.update_scan_metrics(data_blocks.as_slice()); + + let mut output_blocks = VecDeque::with_capacity(data_blocks.len()); + for mut data_block in data_blocks { + let origin_num_rows = data_block.num_rows(); + + let mut filter = None; + if self.ctx.has_bloom_runtime_filters(self.table_index) { + if let Some(bitmap) = self.runtime_filter(&data_block)? { + data_block = data_block.filter_with_bitmap(&bitmap)?; + filter = Some(bitmap); + } + } + + // Process virtual columns if available: This step enriches the DataBlock + // with virtual columns that were not originally present. + // The paster was created earlier from the VirtualColumnReader and maintains + // the state necessary to merge virtual columns into each data block in + // sequence, ensuring each block gets the correct corresponding virtual data. + if let Some(virtual_columns_paster) = &mut virtual_columns_paster { + data_block = virtual_columns_paster.paste_virtual_column(data_block)?; + } - self.output_data = Some(data_block); + let mut data_block = + data_block.resort(&self.src_schema, &self.output_schema)?; + + // Fill `BlockMetaIndex` as `DataBlock.meta` if query internal columns, + // `TransformAddInternalColumns` will generate internal columns using `BlockMetaIndex` in next pipeline. + let offsets = if self.block_reader.query_internal_columns() { + filter.as_ref().map(|bitmap| { + (0..origin_num_rows) + .filter(|i| unsafe { bitmap.get_bit_unchecked(*i) }) + .collect() + }) + } else { + None + }; + + data_block = add_data_block_meta( + data_block, + part, + offsets, + self.base_block_ids.clone(), + self.block_reader.update_stream_columns(), + self.block_reader.query_internal_columns(), + self.need_reserve_block_info, + )?; + output_blocks.push_back(data_block); + } + + self.output_data = output_blocks; } } } @@ -370,3 +392,18 @@ impl Processor for DeserializeDataTransform { Ok(()) } } + +impl DeserializeDataTransform { + fn update_scan_metrics(&self, blocks: &[DataBlock]) { + let (num_rows, memory_size) = blocks.iter().fold((0, 0), |(rows, size), block| { + (block.num_rows() + rows, block.memory_size() + size) + }); + + let progress_values = ProgressValues { + rows: num_rows, + bytes: memory_size, + }; + self.scan_progress.incr(&progress_values); + Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, memory_size); + } +} diff --git a/tests/sqllogictests/suites/tpch/spill.test b/tests/sqllogictests/suites/tpch/spill.test index c6aa2cea290b0..eea949c3fc125 100644 --- a/tests/sqllogictests/suites/tpch/spill.test +++ b/tests/sqllogictests/suites/tpch/spill.test @@ -34,6 +34,7 @@ FROM ( SELECT * FROM lineitem, part + ORDER BY lineitem.l_orderkey, part.p_partkey LIMIT 10000000 ); ----