From 2f6925fda894b00e1de1be0ab62c80533f1ebcf5 Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 30 Jun 2025 22:05:25 +0800 Subject: [PATCH 01/12] improve stream write --- .../fuse/src/io/write/stream/block_builder.rs | 26 +- .../io/write/stream/column_ndv_estimator.rs | 141 +++++++++ .../src/io/write/stream/column_statistics.rs | 253 --------------- .../write/stream/column_statistics_builder.rs | 289 ++++++++++++++++++ .../write/stream/column_statistics_state.rs | 103 +++++++ .../storages/fuse/src/io/write/stream/mod.rs | 9 +- 6 files changed, 558 insertions(+), 263 deletions(-) create mode 100644 src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs delete mode 100644 src/query/storages/fuse/src/io/write/stream/column_statistics.rs create mode 100644 src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs create mode 100644 src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 7193d988952e8..21a8736ba67eb 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -35,6 +35,7 @@ use databend_common_expression::TableSchemaRef; use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; use databend_common_native::write::NativeWriter; +use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_index::BloomIndex; use databend_storages_common_index::BloomIndexBuilder; use databend_storages_common_index::Index; @@ -52,7 +53,7 @@ use parquet::file::properties::WriterProperties; use crate::io::create_inverted_index_builders; use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder; use crate::io::write::stream::cluster_statistics::ClusterStatisticsState; -use crate::io::write::stream::column_statistics::ColumnStatisticsState; +use crate::io::write::stream::ColumnStatisticsState; use crate::io::write::InvertedIndexState; use crate::io::BlockSerialization; use crate::io::BloomIndexState; @@ -254,7 +255,7 @@ impl StreamBlockBuilder { pub fn need_flush(&self) -> bool { let file_size = self.block_writer.compressed_size(); self.row_count >= self.properties.block_thresholds.min_rows_per_block - || self.block_size >= self.properties.block_thresholds.max_bytes_per_block + || self.block_size >= self.properties.block_thresholds.min_bytes_per_block * 2 || (file_size >= self.properties.block_thresholds.min_compressed_per_block && self.block_size >= self.properties.block_thresholds.min_bytes_per_block) } @@ -385,8 +386,8 @@ pub struct StreamBlockProperties { source_schema: TableSchemaRef, cluster_stats_builder: Arc, - stats_columns: Vec, - distinct_columns: Vec, + stats_columns: Vec<(ColumnId, DataType)>, + distinct_columns: Vec<(ColumnId, DataType)>, bloom_columns_map: BTreeMap, ngram_args: Vec, inverted_index_builders: Vec, @@ -398,16 +399,23 @@ impl StreamBlockProperties { pub fn try_create( ctx: Arc, table: &FuseTable, + kind: MutationKind, table_meta_timestamps: TableMetaTimestamps, ) -> Result> { // remove virtual computed fields. - let fields = table + let mut fields = table .schema() .fields() .iter() .filter(|f| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_)))) .cloned() .collect::>(); + if !matches!(kind, MutationKind::Insert | MutationKind::Replace) { + // add stream fields. + for stream_column in table.stream_columns().iter() { + fields.push(stream_column.table_field()); + } + } let source_schema = Arc::new(TableSchema { fields, @@ -438,12 +446,12 @@ impl StreamBlockProperties { let leaf_fields = source_schema.leaf_fields(); for field in leaf_fields.iter() { let column_id = field.column_id(); - if RangeIndex::supported_type(&DataType::from(field.data_type())) - && column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID + let data_type = DataType::from(field.data_type()); + if RangeIndex::supported_type(&data_type) && column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID { - stats_columns.push(column_id); + stats_columns.push((column_id, data_type.clone())); if !bloom_column_ids.contains(&column_id) { - distinct_columns.push(column_id); + distinct_columns.push((column_id, data_type)); } } } diff --git a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs new file mode 100644 index 0000000000000..0e0d64e96805b --- /dev/null +++ b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs @@ -0,0 +1,141 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::Hash; +use std::marker::PhantomData; + +use databend_common_expression::types::boolean::TrueIdxIter; +use databend_common_expression::types::DataType; +use databend_common_expression::types::DateType; +use databend_common_expression::types::Decimal128Type; +use databend_common_expression::types::Decimal256Type; +use databend_common_expression::types::Decimal64Type; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::types::ValueType; +use databend_common_expression::with_number_mapped_type; +use databend_common_expression::Column; +use databend_common_expression::ScalarRef; +use databend_common_expression::SELECTIVITY_THRESHOLD; +use databend_storages_common_table_meta::meta::ColumnDistinctHLL; + +pub trait ColumnNDVEstimator: Send + Sync { + fn update_column(&mut self, column: &Column); + fn update_scalar(&mut self, scalar: &ScalarRef); + fn finalize(&self) -> u64; +} + +pub fn create_column_ndv_estimator(data_type: &DataType) -> Box { + let inner_type = data_type.remove_nullable(); + with_number_mapped_type!(|NUM_TYPE| match inner_type { + DataType::Number(NumberDataType::NUM_TYPE) => { + ColumnNDVEstimatorImpl::>::create() + } + DataType::String => { + ColumnNDVEstimatorImpl::::create() + } + DataType::Date => { + ColumnNDVEstimatorImpl::::create() + } + DataType::Timestamp => { + ColumnNDVEstimatorImpl::::create() + } + DataType::Decimal(size) => { + if size.can_carried_by_64() { + ColumnNDVEstimatorImpl::::create() + } else if size.can_carried_by_128() { + ColumnNDVEstimatorImpl::::create() + } else { + ColumnNDVEstimatorImpl::::create() + } + } + _ => unreachable!("Unsupported data type: {:?}", data_type), + }) +} + +pub struct ColumnNDVEstimatorImpl +where + T: ValueType + Send + Sync, + for<'a> T::ScalarRef<'a>: Hash, +{ + hll: ColumnDistinctHLL, + _phantom: PhantomData, +} + +impl ColumnNDVEstimatorImpl +where + T: ValueType + Send + Sync, + for<'a> T::ScalarRef<'a>: Hash, +{ + pub fn create() -> Box { + Box::new(Self { + hll: ColumnDistinctHLL::new(), + _phantom: Default::default(), + }) + } +} + +impl ColumnNDVEstimator for ColumnNDVEstimatorImpl +where + T: ValueType + Send + Sync, + for<'a> T::ScalarRef<'a>: Hash, +{ + fn update_column(&mut self, column: &Column) { + let (column, validity) = if let Column::Nullable(box inner) = column { + let validity = if inner.validity.null_count() == 0 { + None + } else { + Some(&inner.validity) + }; + (&inner.column, validity) + } else { + (column, None) + }; + + let column = T::try_downcast_column(column).unwrap(); + if let Some(v) = validity { + if v.true_count() as f64 / v.len() as f64 >= SELECTIVITY_THRESHOLD { + for (data, valid) in T::iter_column(&column).zip(v.iter()) { + if valid { + self.hll.add_object(&data); + } + } + } else { + TrueIdxIter::new(v.len(), Some(v)).for_each(|idx| { + let val = unsafe { T::index_column_unchecked(&column, idx) }; + self.hll.add_object(&val); + }) + } + } else { + for value in T::iter_column(&column) { + self.hll.add_object(&value); + } + } + } + + fn update_scalar(&mut self, scalar: &ScalarRef) { + if matches!(scalar, ScalarRef::Null) { + return; + } + + let val = T::try_downcast_scalar(scalar).unwrap(); + self.hll.add_object(&val); + } + + fn finalize(&self) -> u64 { + self.hll.count() as u64 + } +} diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics.rs deleted file mode 100644 index 402504e3a207e..0000000000000 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics.rs +++ /dev/null @@ -1,253 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use databend_common_exception::Result; -use databend_common_expression::types::AccessType; -use databend_common_expression::types::DataType; -use databend_common_expression::types::DateType; -use databend_common_expression::types::DecimalColumn; -use databend_common_expression::types::DecimalScalar; -use databend_common_expression::types::NumberDataType; -use databend_common_expression::types::NumberType; -use databend_common_expression::types::StringType; -use databend_common_expression::types::TimestampType; -use databend_common_expression::with_number_mapped_type; -use databend_common_expression::Column; -use databend_common_expression::ColumnId; -use databend_common_expression::DataBlock; -use databend_common_expression::Scalar; -use databend_common_expression::ScalarRef; -use databend_common_expression::TableSchemaRef; -use databend_common_expression::Value; -use databend_common_functions::aggregates::eval_aggr; -use databend_storages_common_table_meta::meta::ColumnDistinctHLL; -use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::StatisticsOfColumns; - -use crate::statistics::reducers::reduce_column_statistics; -use crate::statistics::traverse_values_dfs; -use crate::statistics::Trim; - -pub struct ColumnStatisticsState { - col_stats: HashMap>, - distinct_columns: HashMap, -} - -impl ColumnStatisticsState { - pub fn new(stats_columns: &[ColumnId], distinct_columns: &[ColumnId]) -> Self { - let col_stats = stats_columns - .iter() - .map(|&col_id| (col_id, Vec::new())) - .collect(); - - let distinct_columns = distinct_columns - .iter() - .map(|&col_id| (col_id, ColumnDistinctHLL::default())) - .collect(); - - Self { - col_stats, - distinct_columns, - } - } - - pub fn add_block(&mut self, schema: &TableSchemaRef, data_block: &DataBlock) -> Result<()> { - let rows = data_block.num_rows(); - let leaves = traverse_values_dfs(data_block.columns(), schema.fields())?; - for (column_id, col, data_type) in leaves { - match col { - Value::Scalar(s) => { - let unset_bits = if s == Scalar::Null { rows } else { 0 }; - // when we read it back from parquet, it is a Column instead of Scalar - let in_memory_size = s.as_ref().estimated_scalar_repeat_size(rows, &data_type); - let col_stats = ColumnStatistics::new( - s.clone(), - s.clone(), - unset_bits as u64, - in_memory_size as u64, - None, - ); - if let Some(hll) = self.distinct_columns.get_mut(&column_id) { - scalar_update_hll_cardinality(&s.as_ref(), &data_type, hll); - } - self.col_stats.get_mut(&column_id).unwrap().push(col_stats); - } - Value::Column(col) => { - // later, during the evaluation of expressions, name of field does not matter - let mut min = Scalar::Null; - let mut max = Scalar::Null; - - let (mins, _) = eval_aggr("min", vec![], &[col.clone().into()], rows, vec![])?; - if mins.len() > 0 { - min = if let Some(v) = mins.index(0) { - // safe upwrap. - v.to_owned().trim_min().unwrap() - } else { - self.col_stats.remove(&column_id); - continue; - } - } - - let (maxs, _) = eval_aggr("max", vec![], &[col.clone().into()], rows, vec![])?; - if maxs.len() > 0 { - max = if let Some(v) = maxs.index(0) { - if let Some(v) = v.to_owned().trim_max() { - v - } else { - self.col_stats.remove(&column_id); - continue; - } - } else { - self.col_stats.remove(&column_id); - continue; - } - } - - let (is_all_null, bitmap) = col.validity(); - let unset_bits = match (is_all_null, bitmap) { - (true, _) => rows, - (false, Some(bitmap)) => bitmap.null_count(), - (false, None) => 0, - }; - let in_memory_size = col.memory_size() as u64; - let col_stats = - ColumnStatistics::new(min, max, unset_bits as u64, in_memory_size, None); - self.col_stats.get_mut(&column_id).unwrap().push(col_stats); - - // use distinct count calculated by the xor hash function to avoid repetitive operation. - if let Some(hll) = self.distinct_columns.get_mut(&column_id) { - column_update_hll_cardinality(&col, &data_type, hll); - } - } - } - } - Ok(()) - } - - pub fn finalize( - self, - column_distinct_count: HashMap, - ) -> Result { - let mut statistics = StatisticsOfColumns::with_capacity(self.col_stats.len()); - for (id, stats) in &self.col_stats { - let mut col_stats = reduce_column_statistics(stats); - if let Some(count) = column_distinct_count.get(id) { - col_stats.distinct_of_values = Some(*count as u64); - } else if let Some(hll) = self.distinct_columns.get(id) { - col_stats.distinct_of_values = Some(hll.count() as u64); - } - statistics.insert(*id, col_stats); - } - Ok(statistics) - } -} - -fn column_update_hll_cardinality(col: &Column, ty: &DataType, hll: &mut ColumnDistinctHLL) { - if let DataType::Nullable(inner) = ty { - let col = col.as_nullable().unwrap(); - for (i, v) in col.validity.iter().enumerate() { - if v { - let scalar = col.column.index(i).unwrap(); - scalar_update_hll_cardinality(&scalar, inner, hll); - } - } - return; - } - - with_number_mapped_type!(|NUM_TYPE| match ty { - DataType::Number(NumberDataType::NUM_TYPE) => { - let col = NumberType::::try_downcast_column(col).unwrap(); - for v in col.iter() { - hll.add_object(v); - } - } - DataType::String => { - let col = StringType::try_downcast_column(col).unwrap(); - for v in col.iter() { - hll.add_object(&v); - } - } - DataType::Date => { - let col = DateType::try_downcast_column(col).unwrap(); - for v in col.iter() { - hll.add_object(v); - } - } - DataType::Timestamp => { - let col = TimestampType::try_downcast_column(col).unwrap(); - for v in col.iter() { - hll.add_object(v); - } - } - DataType::Decimal(_) => { - match col { - Column::Decimal(DecimalColumn::Decimal64(col, _)) => { - for v in col.iter() { - hll.add_object(v); - } - } - Column::Decimal(DecimalColumn::Decimal128(col, _)) => { - for v in col.iter() { - hll.add_object(v); - } - } - Column::Decimal(DecimalColumn::Decimal256(col, _)) => { - for v in col.iter() { - hll.add_object(v); - } - } - _ => unreachable!(), - }; - } - _ => unreachable!("Unsupported data type: {:?}", ty), - }); -} - -fn scalar_update_hll_cardinality(scalar: &ScalarRef, ty: &DataType, hll: &mut ColumnDistinctHLL) { - if matches!(scalar, ScalarRef::Null) { - return; - } - - let ty = ty.remove_nullable(); - - with_number_mapped_type!(|NUM_TYPE| match ty { - DataType::Number(NumberDataType::NUM_TYPE) => { - let val = NumberType::::try_downcast_scalar(scalar).unwrap(); - hll.add_object(&val); - } - DataType::String => { - let val = StringType::try_downcast_scalar(scalar).unwrap(); - hll.add_object(&val); - } - DataType::Date => { - let val = DateType::try_downcast_scalar(scalar).unwrap(); - hll.add_object(&val); - } - DataType::Timestamp => { - let val = TimestampType::try_downcast_scalar(scalar).unwrap(); - hll.add_object(&val); - } - DataType::Decimal(_) => { - match scalar { - ScalarRef::Decimal(DecimalScalar::Decimal64(v, _)) => hll.add_object(&v), - ScalarRef::Decimal(DecimalScalar::Decimal128(v, _)) => hll.add_object(&v), - ScalarRef::Decimal(DecimalScalar::Decimal256(v, _)) => hll.add_object(&v), - _ => unreachable!(), - } - } - _ => unreachable!("Unsupported data type: {:?}", ty), - }); -} diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs new file mode 100644 index 0000000000000..c806a180da32b --- /dev/null +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs @@ -0,0 +1,289 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::marker::PhantomData; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::boolean::TrueIdxIter; +use databend_common_expression::types::DataType; +use databend_common_expression::types::DateType; +use databend_common_expression::types::Decimal; +use databend_common_expression::types::Decimal128Type; +use databend_common_expression::types::Decimal256Type; +use databend_common_expression::types::Decimal64Type; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::types::ValueType; +use databend_common_expression::with_number_mapped_type; +use databend_common_expression::Column; +use databend_common_expression::Scalar; +use databend_common_expression::ScalarRef; +use databend_common_expression::SELECTIVITY_THRESHOLD; +use databend_storages_common_table_meta::meta::ColumnStatistics; + +use crate::statistics::Trim; + +pub trait ColumnStatisticsBuilder: Send + Sync { + fn update_column(&mut self, column: &Column); + + fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType); + + fn finalize(self: Box) -> Result; +} + +pub fn create_column_stats_builder(data_type: &DataType) -> Box { + let inner_type = data_type.remove_nullable(); + with_number_mapped_type!(|NUM_TYPE| match inner_type { + DataType::Number(NumberDataType::NUM_TYPE) => { + GenericColumnStatisticsBuilder::, CommonAdapter>::create( + inner_type, + ) + } + DataType::String => { + GenericColumnStatisticsBuilder::::create(inner_type) + } + DataType::Date => { + GenericColumnStatisticsBuilder::::create(inner_type) + } + DataType::Timestamp => { + GenericColumnStatisticsBuilder::::create(inner_type) + } + DataType::Decimal(size) => { + if size.can_carried_by_64() { + GenericColumnStatisticsBuilder::::create(inner_type) + } else if size.can_carried_by_128() { + GenericColumnStatisticsBuilder::::create(inner_type) + } else { + GenericColumnStatisticsBuilder::::create(inner_type) + } + } + _ => unreachable!("Unsupported data type: {:?}", data_type), + }) +} + +pub trait ColumnStatisticsAdapter: Send + Sync { + type Value: Clone + Send + Sync; + + fn scalar_to_value(val: T::ScalarRef<'_>) -> Self::Value; + + fn value_to_scalar(val: Self::Value) -> T::Scalar; + + fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering); +} + +struct CommonAdapter; + +impl ColumnStatisticsAdapter for CommonAdapter +where + T: ValueType, + T::Scalar: Send + Sync, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + type Value = T::Scalar; + + fn scalar_to_value(val: T::ScalarRef<'_>) -> Self::Value { + T::to_owned_scalar(val) + } + + fn value_to_scalar(val: Self::Value) -> T::Scalar { + val + } + + fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering) { + if scalar.partial_cmp(&T::to_scalar_ref(value)) == Some(ordering) { + *value = T::to_owned_scalar(scalar); + } + } +} + +struct DecimalAdapter; + +impl ColumnStatisticsAdapter for DecimalAdapter +where + T: ValueType, + T::Scalar: Decimal + Send + Sync, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + type Value = ::U64Array; + + fn scalar_to_value(val: T::ScalarRef<'_>) -> Self::Value { + T::Scalar::to_u64_array(T::to_owned_scalar(val)) + } + + fn value_to_scalar(val: Self::Value) -> T::Scalar { + T::Scalar::from_u64_array(val) + } + + fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering) { + let val = T::Scalar::from_u64_array(*value); + if scalar.partial_cmp(&T::to_scalar_ref(&val)) == Some(ordering) { + *value = T::Scalar::to_u64_array(T::to_owned_scalar(scalar)); + } + } +} + +struct GenericColumnStatisticsBuilder +where + T: ValueType, + A: ColumnStatisticsAdapter, +{ + min: Option, + max: Option, + null_count: usize, + in_memory_size: usize, + data_type: DataType, + + _phantom: PhantomData<(T, A)>, +} + +impl GenericColumnStatisticsBuilder +where + T: ValueType + Send + Sync, + T::Scalar: Send + Sync, + A: ColumnStatisticsAdapter + 'static, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + fn create(data_type: DataType) -> Box { + Box::new(Self { + min: None, + max: None, + null_count: 0, + in_memory_size: 0, + data_type, + _phantom: PhantomData, + }) + } + + fn add_batch<'a, I>(&mut self, mut iter: I) + where I: Iterator> { + let first = iter.next().unwrap(); + let mut min = first.clone(); + let mut max = first; + for v in iter { + if matches!(min.partial_cmp(&v), Some(Ordering::Greater)) { + min = v; + continue; + } + + if matches!(max.partial_cmp(&v), Some(Ordering::Less)) { + max = v; + } + } + + self.add(min, max); + } + + fn add(&mut self, min: T::ScalarRef<'_>, max: T::ScalarRef<'_>) { + if let Some(val) = self.min.as_mut() { + A::update_value(val, min, Ordering::Less); + } else { + self.min = Some(A::scalar_to_value(min)); + } + + if let Some(val) = self.max.as_mut() { + A::update_value(val, max, Ordering::Greater); + } else { + self.max = Some(A::scalar_to_value(max)); + } + } +} + +impl ColumnStatisticsBuilder for GenericColumnStatisticsBuilder +where + T: ValueType + Send + Sync, + T::Scalar: Send + Sync, + A: ColumnStatisticsAdapter + 'static, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + fn update_column(&mut self, column: &Column) { + self.in_memory_size += column.memory_size(); + let (column, validity) = if let Column::Nullable(box inner) = column { + let validity = if inner.validity.null_count() == 0 { + None + } else { + Some(&inner.validity) + }; + (&inner.column, validity) + } else { + (column, None) + }; + self.null_count += validity.map_or(0, |v| v.null_count()); + + let column = T::try_downcast_column(column).unwrap(); + if let Some(v) = validity { + if v.true_count() as f64 / v.len() as f64 >= SELECTIVITY_THRESHOLD { + let column_iter = T::iter_column(&column); + let value_iter = column_iter + .zip(v.iter()) + .filter(|(_, v)| *v) + .map(|(v, _)| v); + self.add_batch(value_iter); + } else { + for idx in TrueIdxIter::new(v.len(), Some(v)) { + let v = unsafe { T::index_column_unchecked(&column, idx) }; + self.add(v.clone(), v); + } + } + } else { + let column_iter = T::iter_column(&column); + self.add_batch(column_iter); + } + } + + fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType) { + // when we read it back from parquet, it is a Column instead of Scalar + self.in_memory_size += scalar.estimated_scalar_repeat_size(num_rows, data_type); + if scalar.is_null() { + self.null_count += num_rows; + return; + } + + let val = T::try_downcast_scalar(scalar).unwrap(); + self.add(val.clone(), val); + } + + fn finalize(self: Box) -> Result { + let min = if let Some(v) = self.min { + let v = A::value_to_scalar(v); + // safe upwrap. + T::upcast_scalar_with_type(v, &self.data_type) + .trim_min() + .unwrap() + } else { + Scalar::Null + }; + let max = if let Some(v) = self.max { + let v = A::value_to_scalar(v); + if let Some(v) = T::upcast_scalar_with_type(v, &self.data_type).trim_max() { + v + } else { + return Err(ErrorCode::Internal("Unable to trim string")); + } + } else { + Scalar::Null + }; + + Ok(ColumnStatistics::new( + min, + max, + self.null_count as u64, + self.in_memory_size as u64, + None, + )) + } +} diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs new file mode 100644 index 0000000000000..b7e51bb552bdb --- /dev/null +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -0,0 +1,103 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::ColumnId; +use databend_common_expression::DataBlock; +use databend_common_expression::TableSchemaRef; +use databend_common_expression::Value; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; + +use crate::io::write::stream::create_column_ndv_estimator; +use crate::io::write::stream::create_column_stats_builder; +use crate::io::write::stream::ColumnNDVEstimator; +use crate::io::write::stream::ColumnStatisticsBuilder; +use crate::statistics::traverse_values_dfs; + +pub struct ColumnStatisticsState { + col_stats: HashMap>, + distinct_columns: HashMap>, +} + +impl ColumnStatisticsState { + pub fn new( + stats_columns: &[(ColumnId, DataType)], + distinct_columns: &[(ColumnId, DataType)], + ) -> Self { + let col_stats = stats_columns + .iter() + .map(|(col_id, data_type)| (*col_id, create_column_stats_builder(data_type))) + .collect(); + + let distinct_columns = distinct_columns + .iter() + .map(|(col_id, data_type)| (*col_id, create_column_ndv_estimator(data_type))) + .collect(); + + Self { + col_stats, + distinct_columns, + } + } + + pub fn add_block(&mut self, schema: &TableSchemaRef, data_block: &DataBlock) -> Result<()> { + let rows = data_block.num_rows(); + let leaves = traverse_values_dfs(data_block.columns(), schema.fields())?; + for (column_id, col, data_type) in leaves { + match col { + Value::Scalar(s) => { + self.col_stats.get_mut(&column_id).unwrap().update_scalar( + &s.as_ref(), + rows, + &data_type, + ); + if let Some(estimator) = self.distinct_columns.get_mut(&column_id) { + estimator.update_scalar(&s.as_ref()); + } + } + Value::Column(col) => { + self.col_stats + .get_mut(&column_id) + .unwrap() + .update_column(&col); + // use distinct count calculated by the xor hash function to avoid repetitive operation. + if let Some(estimator) = self.distinct_columns.get_mut(&column_id) { + estimator.update_column(&col); + } + } + } + } + Ok(()) + } + + pub fn finalize( + self, + column_distinct_count: HashMap, + ) -> Result { + let mut statistics = StatisticsOfColumns::with_capacity(self.col_stats.len()); + for (id, stats) in self.col_stats { + let mut col_stats = stats.finalize()?; + if let Some(count) = column_distinct_count.get(&id) { + col_stats.distinct_of_values = Some(*count as u64); + } else if let Some(estimator) = self.distinct_columns.get(&id) { + col_stats.distinct_of_values = Some(estimator.finalize()); + } + statistics.insert(id, col_stats); + } + Ok(statistics) + } +} diff --git a/src/query/storages/fuse/src/io/write/stream/mod.rs b/src/query/storages/fuse/src/io/write/stream/mod.rs index 26d32ee679582..3eda792ec7433 100644 --- a/src/query/storages/fuse/src/io/write/stream/mod.rs +++ b/src/query/storages/fuse/src/io/write/stream/mod.rs @@ -14,7 +14,14 @@ mod block_builder; mod cluster_statistics; -mod column_statistics; +mod column_ndv_estimator; +mod column_statistics_builder; +mod column_statistics_state; pub(crate) use block_builder::StreamBlockBuilder; pub(crate) use block_builder::StreamBlockProperties; +pub(crate) use column_ndv_estimator::create_column_ndv_estimator; +pub(crate) use column_ndv_estimator::ColumnNDVEstimator; +pub(crate) use column_statistics_builder::create_column_stats_builder; +pub(crate) use column_statistics_builder::ColumnStatisticsBuilder; +pub(crate) use column_statistics_state::ColumnStatisticsState; From d99614758abe316b9505800648394a74a746c37f Mon Sep 17 00:00:00 2001 From: zhyass Date: Tue, 1 Jul 2025 18:49:31 +0800 Subject: [PATCH 02/12] update --- src/query/settings/src/settings_default.rs | 7 - .../settings/src/settings_getter_setter.rs | 4 - .../storages/common/index/src/bloom_index.rs | 92 ++++++++--- src/query/storages/fuse/src/fuse_table.rs | 7 + .../fuse/src/io/write/block_writer.rs | 8 + .../fuse/src/io/write/bloom_index_writer.rs | 1 + .../src/io/write/inverted_index_writer.rs | 1 + .../storages/fuse/src/operations/append.rs | 26 ++-- .../src/operations/common/processors/mod.rs | 1 + .../processors/transform_block_writer.rs | 143 +++++++++++------- 10 files changed, 192 insertions(+), 98 deletions(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 9b32bbbc233fd..c1c982a7af915 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1319,13 +1319,6 @@ impl DefaultSettings { scope: SettingScope::Both, range: None, }), - ("enable_block_stream_write", DefaultSettingValue { - value: UserSettingValue::UInt64(0), - desc: "Enables block stream write", - mode: SettingMode::Both, - scope: SettingScope::Both, - range: Some(SettingRange::Numeric(0..=1)), - }), ("trace_sample_rate", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Setting the trace sample rate. The value should be between '0' and '100'", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0e9a7617a1ac9..14f8c822ce990 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -978,10 +978,6 @@ impl Settings { self.set_setting("optimizer_skip_list".to_string(), v) } - pub fn get_enable_block_stream_write(&self) -> Result { - Ok(self.try_get_u64("enable_block_stream_write")? == 1) - } - pub fn get_statement_queue_ttl_in_seconds(&self) -> Result { self.try_get_u64("statement_queue_ttl_in_seconds") } diff --git a/src/query/storages/common/index/src/bloom_index.rs b/src/query/storages/common/index/src/bloom_index.rs index bf5ec35035e64..59d06f6298a49 100644 --- a/src/query/storages/common/index/src/bloom_index.rs +++ b/src/query/storages/common/index/src/bloom_index.rs @@ -14,6 +14,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; +use std::hash::DefaultHasher; use std::hash::Hasher; use std::ops::ControlFlow; use std::ops::Deref; @@ -35,12 +36,18 @@ use databend_common_expression::types::BinaryType; use databend_common_expression::types::Bitmap; use databend_common_expression::types::Buffer; use databend_common_expression::types::DataType; +use databend_common_expression::types::DateType; use databend_common_expression::types::MapType; use databend_common_expression::types::NullableType; use databend_common_expression::types::Number; use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; use databend_common_expression::types::UInt64Type; +use databend_common_expression::types::ValueType; use databend_common_expression::visit_expr; +use databend_common_expression::with_number_mapped_type; use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; @@ -349,6 +356,71 @@ impl BloomIndex { Ok(column) } + pub fn calculate_digest_by_type(data_type: &DataType, column: &Column) -> Result> { + let inner_type = data_type.remove_nullable(); + with_number_mapped_type!(|NUM_TYPE| match inner_type { + DataType::Number(NumberDataType::NUM_TYPE) => { + Self::calculate_nullable_column_digests::>(column) + } + DataType::String => { + Self::calculate_nullable_column_digests::(column) + } + DataType::Date => { + Self::calculate_nullable_column_digests::(column) + } + DataType::Timestamp => { + Self::calculate_nullable_column_digests::(column) + } + _ => Err(ErrorCode::Internal(format!( + "Unsupported data type: {:?}", + data_type + ))), + }) + } + + #[inline(always)] + fn hash_one(v: &T) -> u64 { + let mut hasher = DefaultHasher::default(); + DFHash::hash(v, &mut hasher); + hasher.finish() + } + + fn calculate_nullable_column_digests(column: &Column) -> Result> + where for<'a> T::ScalarRef<'a>: DFHash { + let (column, validity) = if let Column::Nullable(box inner) = column { + let validity = if inner.validity.null_count() == 0 { + None + } else { + Some(&inner.validity) + }; + (&inner.column, validity) + } else { + (column, None) + }; + + let capacity = validity.map_or(column.len(), |v| v.true_count() + 1); + let mut result = Vec::with_capacity(capacity); + if validity.is_some() { + result.push(0); + } + let column = T::try_downcast_column(column).unwrap(); + if let Some(validity) = validity { + let column_iter = T::iter_column(&column); + let value_iter = column_iter + .zip(validity.iter()) + .filter(|(_, v)| *v) + .map(|(v, _)| v); + for value in value_iter { + result.push(Self::hash_one(&value)); + } + } else { + for value in T::iter_column(&column) { + result.push(Self::hash_one(&value)); + } + } + Ok(result) + } + /// calculate digest for column that may have null values /// /// returns (column, validity) where column is the digest of the column @@ -734,24 +806,8 @@ impl BloomIndexBuilder { } }; - let (column, validity) = - BloomIndex::calculate_nullable_column_digest(&self.func_ctx, &column, &data_type)?; - // create filter per column - if validity.as_ref().map(|v| v.null_count()).unwrap_or(0) > 0 { - let validity = validity.unwrap(); - let it = column.deref().iter().zip(validity.iter()).map( - |(v, b)| { - if !b { - &0 - } else { - v - } - }, - ); - index_column.builder.add_digests(it); - } else { - index_column.builder.add_digests(column.deref()); - } + let column = BloomIndex::calculate_digest_by_type(&data_type, &column)?; + index_column.builder.add_digests(column.deref()); } for index_column in self.ngram_columns.iter_mut() { let field_type = &block.data_type(index_column.index); diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 96c17992f4141..b99db259c2f39 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -745,6 +745,13 @@ impl FuseTable { ) }) } + + pub fn enable_stream_block_write(&self) -> bool { + matches!(self.storage_format, FuseStorageFormat::Parquet) + && self + .cluster_type() + .is_none_or(|v| matches!(v, ClusterType::Hilbert)) + } } #[async_trait::async_trait] diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index c6593a248c8af..ade34fd17609f 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -20,6 +20,8 @@ use std::time::Instant; use chrono::Utc; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; use databend_common_expression::Column; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; @@ -124,6 +126,7 @@ pub async fn write_data(data: Vec, data_accessor: &Operator, location: &str) Ok(()) } +#[derive(Debug)] pub struct BlockSerialization { pub block_raw_data: Vec, pub block_meta: BlockMeta, @@ -132,6 +135,11 @@ pub struct BlockSerialization { pub virtual_column_state: Option, } +local_block_meta_serde!(BlockSerialization); + +#[typetag::serde(name = "block_serialization_meta")] +impl BlockMetaInfo for BlockSerialization {} + #[derive(Clone)] pub struct BlockBuilder { pub ctx: Arc, diff --git a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs index ec49070a6f08f..738c33ac2f2c3 100644 --- a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs @@ -40,6 +40,7 @@ use opendal::Operator; use crate::io::BlockReader; use crate::FuseStorageFormat; +#[derive(Debug)] pub struct BloomIndexState { pub(crate) data: Vec, pub(crate) size: u64, diff --git a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs index 74377a86108cb..8cf0b5f2355f0 100644 --- a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs @@ -121,6 +121,7 @@ pub fn create_inverted_index_builders(table_meta: &TableMeta) -> Vec, pub(crate) size: u64, diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index 9316374128528..d1ca44fd41d4c 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -37,10 +37,11 @@ use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::table::ClusterType; +use crate::io::StreamBlockProperties; +use crate::operations::TransformBlockBuilder; use crate::operations::TransformBlockWriter; use crate::operations::TransformSerializeBlock; use crate::statistics::ClusterStatsGenerator; -use crate::FuseStorageFormat; use crate::FuseTable; impl FuseTable { @@ -50,19 +51,22 @@ impl FuseTable { pipeline: &mut Pipeline, table_meta_timestamps: TableMetaTimestamps, ) -> Result<()> { - let enable_stream_block_write = ctx.get_settings().get_enable_block_stream_write()? - && matches!(self.storage_format, FuseStorageFormat::Parquet); + let enable_stream_block_write = self.enable_stream_block_write(); if enable_stream_block_write { + let properties = StreamBlockProperties::try_create( + ctx.clone(), + self, + MutationKind::Insert, + table_meta_timestamps, + )?; + pipeline.add_transform(|input, output| { - TransformBlockWriter::try_create( - ctx.clone(), - input, - output, - self, - table_meta_timestamps, - false, - ) + TransformBlockBuilder::try_create(input, output, properties.clone()) })?; + + pipeline.add_async_accumulating_transformer(|| { + TransformBlockWriter::create(ctx.clone(), MutationKind::Insert, self, false) + }); } else { let block_thresholds = self.get_block_thresholds(); build_compact_block_pipeline(pipeline, block_thresholds)?; diff --git a/src/query/storages/fuse/src/operations/common/processors/mod.rs b/src/query/storages/fuse/src/operations/common/processors/mod.rs index e0e3d3b25f25a..d43c569c14016 100644 --- a/src/query/storages/fuse/src/operations/common/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/common/processors/mod.rs @@ -22,6 +22,7 @@ mod transform_serialize_segment; pub use multi_table_insert_commit::CommitMultiTableInsert; pub use sink_commit::CommitSink; +pub use transform_block_writer::TransformBlockBuilder; pub use transform_block_writer::TransformBlockWriter; pub use transform_merge_commit_meta::TransformMergeCommitMeta; pub use transform_mutation_aggregator::TableMutationAggregator; diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs b/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs index ce11a4834c7fb..207085d51fd26 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs @@ -22,35 +22,36 @@ use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; -use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT; +use databend_common_metrics::storage::metrics_inc_recluster_write_block_nums; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::AsyncAccumulatingTransform; +use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_storage::MutationStatus; -use databend_storages_common_table_meta::meta::TableMetaTimestamps; use opendal::Operator; use crate::io::BlockSerialization; use crate::io::BlockWriter; use crate::io::StreamBlockBuilder; use crate::io::StreamBlockProperties; +use crate::operations::MutationLogEntry; +use crate::operations::MutationLogs; use crate::FuseTable; -use crate::FUSE_OPT_KEY_ROW_PER_BLOCK; -#[allow(clippy::large_enum_variant)] enum State { Consume, Collect(DataBlock), Serialize, Finalize, Flush, - Write(BlockSerialization), } -pub struct TransformBlockWriter { +pub struct TransformBlockBuilder { state: State, input: Arc, output: Arc, @@ -62,43 +63,27 @@ pub struct TransformBlockWriter { input_data_size: usize, input_num_rows: usize, - dal: Operator, - // Only used in multi table insert - table_id: Option, - - max_block_rows: usize, input_data: VecDeque, output_data: Option, } -impl TransformBlockWriter { +impl TransformBlockBuilder { pub fn try_create( - ctx: Arc, input: Arc, output: Arc, - table: &FuseTable, - table_meta_timestamps: TableMetaTimestamps, - with_tid: bool, + properties: Arc, ) -> Result { - let max_block_rows = std::cmp::min( - ctx.get_settings().get_max_block_size()? as usize, - table.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_BLOCK_ROW_COUNT), - ); - let properties = StreamBlockProperties::try_create(ctx, table, table_meta_timestamps)?; - Ok(ProcessorPtr::create(Box::new(TransformBlockWriter { + Ok(ProcessorPtr::create(Box::new(TransformBlockBuilder { state: State::Consume, input, output, properties, builder: None, - dal: table.get_operator(), need_flush: false, - table_id: if with_tid { Some(table.get_id()) } else { None }, input_data: VecDeque::new(), input_data_size: 0, input_num_rows: 0, output_data: None, - max_block_rows, }))) } @@ -111,23 +96,23 @@ impl TransformBlockWriter { Ok(self.builder.as_mut().unwrap()) } - fn calc_max_block_rows(&self, block: &DataBlock) -> usize { + fn split_input(&self, input: DataBlock) -> Vec { let min_bytes_per_block = self.properties.block_thresholds.min_bytes_per_block; - let block_size = block.estimate_block_size(); - if block_size < min_bytes_per_block { - return self.max_block_rows; + let block_size = input.estimate_block_size(); + if block_size <= min_bytes_per_block { + return vec![input]; } - let num_rows = block.num_rows(); + let num_rows = input.num_rows(); let average_row_size = block_size.div_ceil(num_rows); let max_rows = min_bytes_per_block.div_ceil(average_row_size); - self.max_block_rows.min(max_rows) + input.split_by_rows_no_tail(max_rows) } } #[async_trait] -impl Processor for TransformBlockWriter { +impl Processor for TransformBlockBuilder { fn name(&self) -> String { - "TransformBlockWriter".to_string() + "TransformBlockBuilder".to_string() } fn as_any(&mut self) -> &mut dyn Any { @@ -135,15 +120,15 @@ impl Processor for TransformBlockWriter { } fn event(&mut self) -> Result { - match &self.state { - State::Collect(_) | State::Serialize | State::Flush | State::Finalize => { - return Ok(Event::Sync) - } - State::Write(_) => return Ok(Event::Async), - _ => {} + if matches!( + self.state, + State::Collect(_) | State::Serialize | State::Flush | State::Finalize + ) { + return Ok(Event::Sync); } if self.output.is_finished() { + self.input.finish(); return Ok(Event::Finished); } @@ -198,8 +183,7 @@ impl Processor for TransformBlockWriter { block.check_valid()?; self.input_data_size += block.estimate_block_size(); self.input_num_rows += block.num_rows(); - let max_rows_per_block = self.calc_max_block_rows(&block); - let blocks = block.split_by_rows_no_tail(max_rows_per_block); + let blocks = self.split_input(block); self.input_data.extend(blocks); } State::Serialize => { @@ -227,7 +211,7 @@ impl Processor for TransformBlockWriter { let builder = self.builder.take().unwrap(); if !builder.is_empty() { let serialized = builder.finish()?; - self.state = State::Write(serialized); + self.output_data = Some(DataBlock::empty_with_meta(Box::new(serialized))); } self.need_flush = false; } @@ -235,11 +219,41 @@ impl Processor for TransformBlockWriter { } Ok(()) } +} - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Consume) { - State::Write(serialized) => { +pub struct TransformBlockWriter { + kind: MutationKind, + dal: Operator, + ctx: Arc, + // Only used in multi table insert + table_id: Option, +} + +impl TransformBlockWriter { + pub fn create( + ctx: Arc, + kind: MutationKind, + table: &FuseTable, + with_tid: bool, + ) -> Self { + Self { + ctx, + dal: table.get_operator(), + table_id: if with_tid { Some(table.get_id()) } else { None }, + kind, + } + } +} + +#[async_trait::async_trait] +impl AsyncAccumulatingTransform for TransformBlockWriter { + const NAME: &'static str = "TransformBlockWriter"; + + async fn transform(&mut self, data: DataBlock) -> Result> { + debug_assert!(data.is_empty()); + + if let Some(ptr) = data.get_owned_meta() { + if let Some(serialized) = BlockSerialization::downcast_from(ptr) { let extended_block_meta = BlockWriter::write_down(&self.dal, serialized).await?; let bytes = if let Some(draft_virtual_block_meta) = @@ -251,32 +265,45 @@ impl Processor for TransformBlockWriter { extended_block_meta.block_meta.block_size as usize }; - self.properties - .ctx - .get_write_progress() - .incr(&ProgressValues { - rows: extended_block_meta.block_meta.row_count as usize, - bytes, - }); + self.ctx.get_write_progress().incr(&ProgressValues { + rows: extended_block_meta.block_meta.row_count as usize, + bytes, + }); // appending new data block if let Some(tid) = self.table_id { - self.properties.ctx.update_multi_table_insert_status( + self.ctx.update_multi_table_insert_status( tid, extended_block_meta.block_meta.row_count, ); } else { - self.properties.ctx.add_mutation_status(MutationStatus { + self.ctx.add_mutation_status(MutationStatus { insert_rows: extended_block_meta.block_meta.row_count, update_rows: 0, deleted_rows: 0, }); } - self.output_data = Some(DataBlock::empty_with_meta(Box::new(extended_block_meta))); + let output = if matches!(self.kind, MutationKind::Insert) { + DataBlock::empty_with_meta(Box::new(extended_block_meta)) + } else { + if matches!(self.kind, MutationKind::Recluster) { + metrics_inc_recluster_write_block_nums(); + } + + DataBlock::empty_with_meta(Box::new(MutationLogs { + entries: vec![MutationLogEntry::AppendBlock { + block_meta: Arc::new(extended_block_meta), + }], + })) + }; + + return Ok(Some(output)); } - _ => return Err(ErrorCode::Internal("It's a bug.")), } - Ok(()) + + Err(ErrorCode::Internal( + "Cannot downcast meta to BlockSerialization", + )) } } From fb41f0cce55ea84d151275153836655f6e6bf03d Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 2 Jul 2025 01:22:09 +0800 Subject: [PATCH 03/12] fix test --- .../io/write/stream/column_ndv_estimator.rs | 20 ++++++----- .../write/stream/column_statistics_builder.rs | 26 +++++++++----- .../write/stream/column_statistics_state.rs | 15 +++++++- .../fuse/src/statistics/column_statistic.rs | 36 ++++++++++--------- 4 files changed, 61 insertions(+), 36 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs index 0e0d64e96805b..cb138c53a1d8c 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs @@ -94,15 +94,17 @@ where for<'a> T::ScalarRef<'a>: Hash, { fn update_column(&mut self, column: &Column) { - let (column, validity) = if let Column::Nullable(box inner) = column { - let validity = if inner.validity.null_count() == 0 { - None - } else { - Some(&inner.validity) - }; - (&inner.column, validity) - } else { - (column, None) + let (column, validity) = match column { + Column::Nullable(box inner) => { + let validity = if inner.validity.null_count() == 0 { + None + } else { + Some(&inner.validity) + }; + (&inner.column, validity) + } + Column::Null { .. } => return, + column => (column, None), }; let column = T::try_downcast_column(column).unwrap(); diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs index c806a180da32b..3178e5da00ef3 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs @@ -212,15 +212,23 @@ where { fn update_column(&mut self, column: &Column) { self.in_memory_size += column.memory_size(); - let (column, validity) = if let Column::Nullable(box inner) = column { - let validity = if inner.validity.null_count() == 0 { - None - } else { - Some(&inner.validity) - }; - (&inner.column, validity) - } else { - (column, None) + if column.len() == 0 { + return; + } + let (column, validity) = match column { + Column::Nullable(box inner) => { + let validity = if inner.validity.null_count() == 0 { + None + } else { + Some(&inner.validity) + }; + (&inner.column, validity) + } + Column::Null { len } => { + self.null_count += *len; + return; + } + col => (col, None), }; self.null_count += validity.map_or(0, |v| v.null_count()); diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index b7e51bb552bdb..f4746b9e04e7e 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -92,9 +92,22 @@ impl ColumnStatisticsState { for (id, stats) in self.col_stats { let mut col_stats = stats.finalize()?; if let Some(count) = column_distinct_count.get(&id) { - col_stats.distinct_of_values = Some(*count as u64); + // value calculated by xor hash function include NULL, need to subtract one. + let distinct_of_values = if col_stats.null_count > 0 { + *count as u64 - 1 + } else { + *count as u64 + }; + col_stats.distinct_of_values = Some(distinct_of_values); } else if let Some(estimator) = self.distinct_columns.get(&id) { col_stats.distinct_of_values = Some(estimator.finalize()); + } else { + assert_eq!(col_stats.min, col_stats.max); + if col_stats.min.is_null() { + col_stats.distinct_of_values = Some(0); + } else { + col_stats.distinct_of_values = Some(1); + } } statistics.insert(id, col_stats); } diff --git a/src/query/storages/fuse/src/statistics/column_statistic.rs b/src/query/storages/fuse/src/statistics/column_statistic.rs index 36737dd9e7a62..d02cf23a96883 100644 --- a/src/query/storages/fuse/src/statistics/column_statistic.rs +++ b/src/query/storages/fuse/src/statistics/column_statistic.rs @@ -83,37 +83,39 @@ pub fn gen_columns_statistics( let mut min = Scalar::Null; let mut max = Scalar::Null; - let (mins, _) = eval_aggr("min", vec![], &[col.clone().into()], rows, vec![])?; - let (maxs, _) = eval_aggr("max", vec![], &[col.clone().into()], rows, vec![])?; + if col.len() > 0 { + let (mins, _) = eval_aggr("min", vec![], &[col.clone().into()], rows, vec![])?; + let (maxs, _) = eval_aggr("max", vec![], &[col.clone().into()], rows, vec![])?; - if mins.len() > 0 { - min = if let Some(v) = mins.index(0) { - if let Some(v) = v.to_owned().trim_min() { - v + if mins.len() > 0 { + min = if let Some(v) = mins.index(0) { + if let Some(v) = v.to_owned().trim_min() { + v + } else { + continue; + } } else { continue; } - } else { - continue; } - } - if maxs.len() > 0 { - max = if let Some(v) = maxs.index(0) { - if let Some(v) = v.to_owned().trim_max() { - v + if maxs.len() > 0 { + max = if let Some(v) = maxs.index(0) { + if let Some(v) = v.to_owned().trim_max() { + v + } else { + continue; + } } else { continue; } - } else { - continue; } } let (is_all_null, bitmap) = col.validity(); let unset_bits = match (is_all_null, bitmap) { - (true, _) => rows, - (false, Some(bitmap)) => bitmap.null_count(), + (_, Some(bitmap)) => bitmap.null_count(), + (true, None) => rows, (false, None) => 0, }; From e8af28cd3f5f3fcfd458056442c752f7649c00ab Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 2 Jul 2025 02:40:38 +0800 Subject: [PATCH 04/12] fix test --- .../write/stream/column_statistics_state.rs | 11 ++++---- .../processors/transform_block_writer.rs | 27 ++++++++++--------- .../09_0004_remote_insert_into_select.test | 3 --- .../09_0008_fuse_optimize_table.test | 3 --- .../suites/base/issues/issue_18275.test | 3 --- 5 files changed, 21 insertions(+), 26 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index f4746b9e04e7e..8bfa47498e316 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -102,11 +102,12 @@ impl ColumnStatisticsState { } else if let Some(estimator) = self.distinct_columns.get(&id) { col_stats.distinct_of_values = Some(estimator.finalize()); } else { - assert_eq!(col_stats.min, col_stats.max); - if col_stats.min.is_null() { - col_stats.distinct_of_values = Some(0); - } else { - col_stats.distinct_of_values = Some(1); + if col_stats.min == col_stats.max { + if col_stats.min.is_null() { + col_stats.distinct_of_values = Some(0); + } else { + col_stats.distinct_of_values = Some(1); + } } } statistics.insert(id, col_stats); diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs b/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs index 207085d51fd26..895f5c7a0ebc5 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs @@ -63,7 +63,7 @@ pub struct TransformBlockBuilder { input_data_size: usize, input_num_rows: usize, - input_data: VecDeque, + input_data: VecDeque<(usize, DataBlock)>, output_data: Option, } @@ -97,14 +97,15 @@ impl TransformBlockBuilder { } fn split_input(&self, input: DataBlock) -> Vec { - let min_bytes_per_block = self.properties.block_thresholds.min_bytes_per_block; let block_size = input.estimate_block_size(); - if block_size <= min_bytes_per_block { - return vec![input]; - } let num_rows = input.num_rows(); let average_row_size = block_size.div_ceil(num_rows); - let max_rows = min_bytes_per_block.div_ceil(average_row_size); + let max_rows = self + .properties + .block_thresholds + .min_bytes_per_block + .div_ceil(average_row_size) + .min(self.properties.block_thresholds.max_rows_per_block); input.split_by_rows_no_tail(max_rows) } } @@ -181,14 +182,16 @@ impl Processor for TransformBlockBuilder { State::Collect(block) => { // Check if the datablock is valid, this is needed to ensure data is correct block.check_valid()?; - self.input_data_size += block.estimate_block_size(); self.input_num_rows += block.num_rows(); - let blocks = self.split_input(block); - self.input_data.extend(blocks); + for block in self.split_input(block) { + let block_size = block.estimate_block_size(); + self.input_data_size += block_size; + self.input_data.push_back((block_size, block)); + } } State::Serialize => { - while let Some(b) = self.input_data.pop_front() { - self.input_data_size -= b.estimate_block_size(); + while let Some((block_size, b)) = self.input_data.pop_front() { + self.input_data_size -= block_size; self.input_num_rows -= b.num_rows(); let builder = self.get_or_create_builder()?; @@ -201,7 +204,7 @@ impl Processor for TransformBlockBuilder { } } State::Finalize => { - while let Some(b) = self.input_data.pop_front() { + while let Some((_, b)) = self.input_data.pop_front() { let builder = self.get_or_create_builder()?; builder.write(b)?; } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0004_remote_insert_into_select.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0004_remote_insert_into_select.test index 5e399dc1b406f..5ad4e316896d0 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0004_remote_insert_into_select.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0004_remote_insert_into_select.test @@ -7,9 +7,6 @@ CREATE DATABASE db_09_004 statement ok USE db_09_004 -statement ok -set enable_block_stream_write = 1 - statement ok CREATE TABLE IF NOT EXISTS t1(a UInt8 not null, b UInt64 not null, c Int8 not null, d Int64 not null, e Date not null, f Date not null, g DateTime not null, h String not null) Engine = Fuse diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test index 43a2b262ca2f9..c19a27a9e8890 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test @@ -7,9 +7,6 @@ CREATE DATABASE db_09_0008 statement ok USE db_09_0008 -statement ok -set enable_block_stream_write = 1 - statement ok create table t(a uint64 not null) diff --git a/tests/sqllogictests/suites/base/issues/issue_18275.test b/tests/sqllogictests/suites/base/issues/issue_18275.test index 36217cda4b3ca..ce895d228329e 100644 --- a/tests/sqllogictests/suites/base/issues/issue_18275.test +++ b/tests/sqllogictests/suites/base/issues/issue_18275.test @@ -14,9 +14,6 @@ CREATE OR REPLACE TABLE product_test ( stock INT ); -statement ok -set enable_block_stream_write = 1; - statement ok INSERT INTO product_test (id, name, category, price, stock) VALUES(6, 'Keyboard', 'Electronics', 79.99, 25), From 7574cd455afc2f80b3922bbd4727ca0ca53f9365 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 2 Jul 2025 02:57:30 +0800 Subject: [PATCH 05/12] make lint --- .../src/io/write/stream/column_statistics_state.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index 8bfa47498e316..0e246911ea542 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -101,13 +101,12 @@ impl ColumnStatisticsState { col_stats.distinct_of_values = Some(distinct_of_values); } else if let Some(estimator) = self.distinct_columns.get(&id) { col_stats.distinct_of_values = Some(estimator.finalize()); - } else { - if col_stats.min == col_stats.max { - if col_stats.min.is_null() { - col_stats.distinct_of_values = Some(0); - } else { - col_stats.distinct_of_values = Some(1); - } + } else if col_stats.min == col_stats.max { + // Bloom index will skip the large string column, it also no need to calc distinct values. + if col_stats.min.is_null() { + col_stats.distinct_of_values = Some(0); + } else { + col_stats.distinct_of_values = Some(1); } } statistics.insert(id, col_stats); From 1215ffd860edd35741b200f1bd87a03c3d6c4b5e Mon Sep 17 00:00:00 2001 From: zhyass Date: Sat, 5 Jul 2025 03:43:45 +0800 Subject: [PATCH 06/12] fix virtual column builder --- Cargo.lock | 1 + src/query/storages/fuse/Cargo.toml | 1 + .../fuse/src/io/write/stream/block_builder.rs | 33 ++- .../write/stream/column_statistics_builder.rs | 143 +++++++++---- .../write/stream/column_statistics_state.rs | 3 +- .../storages/fuse/src/io/write/stream/mod.rs | 3 + .../io/write/stream/virtual_column_builder.rs | 202 ++++++++++++++++++ 7 files changed, 347 insertions(+), 39 deletions(-) create mode 100644 src/query/storages/fuse/src/io/write/stream/virtual_column_builder.rs diff --git a/Cargo.lock b/Cargo.lock index 315cd64e0e5e5..8eebaad296bb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4339,6 +4339,7 @@ dependencies = [ "databend-storages-common-table-meta", "divan", "enum-as-inner", + "enum_dispatch", "fastrace", "futures", "futures-util", diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 811821ec8088d..1e704da63bad9 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -50,6 +50,7 @@ backoff = { workspace = true, features = ["futures", "tokio"] } bytes = { workspace = true } chrono = { workspace = true } enum-as-inner = { workspace = true } +enum_dispatch = { workspace = true } fastrace = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 21a8736ba67eb..f4f03cd83caed 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -35,6 +35,7 @@ use databend_common_expression::TableSchemaRef; use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; use databend_common_native::write::NativeWriter; +use databend_common_native::write::WriteOptions; use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_index::BloomIndex; use databend_storages_common_index::BloomIndexBuilder; @@ -54,6 +55,7 @@ use crate::io::create_inverted_index_builders; use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder; use crate::io::write::stream::cluster_statistics::ClusterStatisticsState; use crate::io::write::stream::ColumnStatisticsState; +use crate::io::write::stream::VirtualColumnWriter; use crate::io::write::InvertedIndexState; use crate::io::BlockSerialization; use crate::io::BloomIndexState; @@ -189,7 +191,7 @@ impl StreamBlockBuilder { let writer = NativeWriter::new( buffer, properties.source_schema.as_ref().clone(), - databend_common_native::write::WriteOptions { + WriteOptions { default_compression: properties.write_settings.table_compression.into(), max_page_size: Some(properties.write_settings.max_page_size), default_compress_ratio, @@ -234,11 +236,18 @@ impl StreamBlockBuilder { ClusterStatisticsState::new(properties.cluster_stats_builder.clone()); let column_stats_state = ColumnStatisticsState::new(&properties.stats_columns, &properties.distinct_columns); + let virtual_column_writer = properties.virtual_column_builder.as_ref().map(|builder| { + VirtualColumnWriter::create( + builder.clone(), + properties.write_settings.table_compression, + ) + }); Ok(StreamBlockBuilder { properties, block_writer, inverted_index_writers, + virtual_column_writer, bloom_index_builder, virtual_column_builder, row_count: 0, @@ -333,6 +342,12 @@ impl StreamBlockBuilder { None }; + let virtual_column_state = if let Some(writer) = self.virtual_column_writer.take() { + Some(writer.finalize(&block_location)?) + } else { + None + }; + let col_metas = self.block_writer.finish(&self.properties.source_schema)?; let block_raw_data = mem::take(self.block_writer.inner_mut()); @@ -363,7 +378,10 @@ impl StreamBlockBuilder { compression: self.properties.write_settings.table_compression.into(), inverted_index_size, create_on: Some(Utc::now()), - ngram_filter_index_size: None, + ngram_filter_index_size: bloom_index_state + .as_ref() + .map(|v| v.ngram_size) + .unwrap_or_default(), virtual_block_meta: None, }; let serialized = BlockSerialization { @@ -391,6 +409,7 @@ pub struct StreamBlockProperties { bloom_columns_map: BTreeMap, ngram_args: Vec, inverted_index_builders: Vec, + virtual_column_builder: Option, table_meta_timestamps: TableMetaTimestamps, support_virtual_columns: bool, } @@ -437,6 +456,15 @@ impl StreamBlockProperties { .collect::>(); let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta); + let virtual_column_builder = if ctx + .get_settings() + .get_enable_refresh_virtual_column_after_write() + .unwrap_or_default() + { + VirtualColumnBuilder::try_create(ctx.clone(), table, source_schema.clone()).ok() + } else { + None + }; let cluster_stats_builder = ClusterStatisticsBuilder::try_create(table, ctx.clone(), &source_schema)?; @@ -463,6 +491,7 @@ impl StreamBlockProperties { source_schema, write_settings, cluster_stats_builder, + virtual_column_builder, stats_columns, distinct_columns, bloom_columns_map, diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs index 3178e5da00ef3..b56f2a96f2e6a 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs @@ -24,56 +24,135 @@ use databend_common_expression::types::Decimal; use databend_common_expression::types::Decimal128Type; use databend_common_expression::types::Decimal256Type; use databend_common_expression::types::Decimal64Type; +use databend_common_expression::types::Float32Type; +use databend_common_expression::types::Float64Type; +use databend_common_expression::types::Int16Type; +use databend_common_expression::types::Int32Type; +use databend_common_expression::types::Int64Type; +use databend_common_expression::types::Int8Type; use databend_common_expression::types::NumberDataType; -use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; use databend_common_expression::types::TimestampType; +use databend_common_expression::types::UInt16Type; +use databend_common_expression::types::UInt32Type; +use databend_common_expression::types::UInt64Type; +use databend_common_expression::types::UInt8Type; use databend_common_expression::types::ValueType; -use databend_common_expression::with_number_mapped_type; use databend_common_expression::Column; use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; use databend_common_expression::SELECTIVITY_THRESHOLD; use databend_storages_common_table_meta::meta::ColumnStatistics; +use enum_dispatch::enum_dispatch; use crate::statistics::Trim; -pub trait ColumnStatisticsBuilder: Send + Sync { - fn update_column(&mut self, column: &Column); +pub type CommonBuilder = GenericColumnStatisticsBuilder; +pub type DecimalBuilder = GenericColumnStatisticsBuilder; + +#[enum_dispatch(ColumnStatsOps)] +pub enum ColumnStatisticsBuilder { + Int8(CommonBuilder), + Int16(CommonBuilder), + Int32(CommonBuilder), + Int64(CommonBuilder), + UInt8(CommonBuilder), + UInt16(CommonBuilder), + UInt32(CommonBuilder), + UInt64(CommonBuilder), + Float32(CommonBuilder), + Float64(CommonBuilder), + String(CommonBuilder), + Date(CommonBuilder), + Timestamp(CommonBuilder), + Decimal64(DecimalBuilder), + Decimal128(DecimalBuilder), + Decimal256(DecimalBuilder), +} +#[enum_dispatch] +pub trait ColumnStatsOps { + fn update_column(&mut self, column: &Column); fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType); + fn finalize(self) -> Result; +} + +impl ColumnStatsOps for GenericColumnStatisticsBuilder +where + T: ValueType + Send + Sync, + T::Scalar: Send + Sync, + A: ColumnStatisticsAdapter + 'static, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + fn update_column(&mut self, column: &Column) { + GenericColumnStatisticsBuilder::update_column(self, column); + } + + fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType) { + GenericColumnStatisticsBuilder::update_scalar(self, scalar, num_rows, data_type); + } + + fn finalize(self) -> Result { + GenericColumnStatisticsBuilder::finalize(self) + } +} - fn finalize(self: Box) -> Result; +macro_rules! create_builder_for_type { + ($data_type:expr, $variant:ident, $type:ty) => { + ColumnStatisticsBuilder::$variant(CommonBuilder::<$type>::create($data_type)) + }; + ($data_type:expr, $variant:ident, $type:ty, decimal) => { + ColumnStatisticsBuilder::$variant(DecimalBuilder::<$type>::create($data_type)) + }; } -pub fn create_column_stats_builder(data_type: &DataType) -> Box { +pub fn create_column_stats_builder(data_type: &DataType) -> ColumnStatisticsBuilder { let inner_type = data_type.remove_nullable(); - with_number_mapped_type!(|NUM_TYPE| match inner_type { - DataType::Number(NumberDataType::NUM_TYPE) => { - GenericColumnStatisticsBuilder::, CommonAdapter>::create( - inner_type, - ) + match inner_type { + DataType::Number(NumberDataType::Int8) => { + create_builder_for_type!(inner_type, Int8, Int8Type) + } + DataType::Number(NumberDataType::Int16) => { + create_builder_for_type!(inner_type, Int16, Int16Type) + } + DataType::Number(NumberDataType::Int32) => { + create_builder_for_type!(inner_type, Int32, Int32Type) + } + DataType::Number(NumberDataType::Int64) => { + create_builder_for_type!(inner_type, Int64, Int64Type) } - DataType::String => { - GenericColumnStatisticsBuilder::::create(inner_type) + DataType::Number(NumberDataType::UInt8) => { + create_builder_for_type!(inner_type, UInt8, UInt8Type) } - DataType::Date => { - GenericColumnStatisticsBuilder::::create(inner_type) + DataType::Number(NumberDataType::UInt16) => { + create_builder_for_type!(inner_type, UInt16, UInt16Type) } - DataType::Timestamp => { - GenericColumnStatisticsBuilder::::create(inner_type) + DataType::Number(NumberDataType::UInt32) => { + create_builder_for_type!(inner_type, UInt32, UInt32Type) } + DataType::Number(NumberDataType::UInt64) => { + create_builder_for_type!(inner_type, UInt64, UInt64Type) + } + DataType::Number(NumberDataType::Float32) => { + create_builder_for_type!(inner_type, Float32, Float32Type) + } + DataType::Number(NumberDataType::Float64) => { + create_builder_for_type!(inner_type, Float64, Float64Type) + } + DataType::String => create_builder_for_type!(inner_type, String, StringType), + DataType::Date => create_builder_for_type!(inner_type, Date, DateType), + DataType::Timestamp => create_builder_for_type!(inner_type, Timestamp, TimestampType), DataType::Decimal(size) => { if size.can_carried_by_64() { - GenericColumnStatisticsBuilder::::create(inner_type) + create_builder_for_type!(inner_type, Decimal64, Decimal64Type, decimal) } else if size.can_carried_by_128() { - GenericColumnStatisticsBuilder::::create(inner_type) + create_builder_for_type!(inner_type, Decimal128, Decimal128Type, decimal) } else { - GenericColumnStatisticsBuilder::::create(inner_type) + create_builder_for_type!(inner_type, Decimal256, Decimal256Type, decimal) } } _ => unreachable!("Unsupported data type: {:?}", data_type), - }) + } } pub trait ColumnStatisticsAdapter: Send + Sync { @@ -86,7 +165,7 @@ pub trait ColumnStatisticsAdapter: Send + Sync { fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering); } -struct CommonAdapter; +pub struct CommonAdapter; impl ColumnStatisticsAdapter for CommonAdapter where @@ -111,7 +190,7 @@ where } } -struct DecimalAdapter; +pub struct DecimalAdapter; impl ColumnStatisticsAdapter for DecimalAdapter where @@ -137,7 +216,7 @@ where } } -struct GenericColumnStatisticsBuilder +pub struct GenericColumnStatisticsBuilder where T: ValueType, A: ColumnStatisticsAdapter, @@ -158,15 +237,15 @@ where A: ColumnStatisticsAdapter + 'static, for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, { - fn create(data_type: DataType) -> Box { - Box::new(Self { + fn create(data_type: DataType) -> Self { + Self { min: None, max: None, null_count: 0, in_memory_size: 0, data_type, _phantom: PhantomData, - }) + } } fn add_batch<'a, I>(&mut self, mut iter: I) @@ -201,15 +280,7 @@ where self.max = Some(A::scalar_to_value(max)); } } -} -impl ColumnStatisticsBuilder for GenericColumnStatisticsBuilder -where - T: ValueType + Send + Sync, - T::Scalar: Send + Sync, - A: ColumnStatisticsAdapter + 'static, - for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, -{ fn update_column(&mut self, column: &Column) { self.in_memory_size += column.memory_size(); if column.len() == 0 { @@ -265,7 +336,7 @@ where self.add(val.clone(), val); } - fn finalize(self: Box) -> Result { + fn finalize(self) -> Result { let min = if let Some(v) = self.min { let v = A::value_to_scalar(v); // safe upwrap. diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index 0e246911ea542..cd295526a9241 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -26,10 +26,11 @@ use crate::io::write::stream::create_column_ndv_estimator; use crate::io::write::stream::create_column_stats_builder; use crate::io::write::stream::ColumnNDVEstimator; use crate::io::write::stream::ColumnStatisticsBuilder; +use crate::io::write::stream::ColumnStatsOps; use crate::statistics::traverse_values_dfs; pub struct ColumnStatisticsState { - col_stats: HashMap>, + col_stats: HashMap, distinct_columns: HashMap>, } diff --git a/src/query/storages/fuse/src/io/write/stream/mod.rs b/src/query/storages/fuse/src/io/write/stream/mod.rs index 3eda792ec7433..dbc883500f3d8 100644 --- a/src/query/storages/fuse/src/io/write/stream/mod.rs +++ b/src/query/storages/fuse/src/io/write/stream/mod.rs @@ -17,6 +17,7 @@ mod cluster_statistics; mod column_ndv_estimator; mod column_statistics_builder; mod column_statistics_state; +mod virtual_column_builder; pub(crate) use block_builder::StreamBlockBuilder; pub(crate) use block_builder::StreamBlockProperties; @@ -24,4 +25,6 @@ pub(crate) use column_ndv_estimator::create_column_ndv_estimator; pub(crate) use column_ndv_estimator::ColumnNDVEstimator; pub(crate) use column_statistics_builder::create_column_stats_builder; pub(crate) use column_statistics_builder::ColumnStatisticsBuilder; +pub(crate) use column_statistics_builder::ColumnStatsOps; pub(crate) use column_statistics_state::ColumnStatisticsState; +pub(crate) use virtual_column_builder::VirtualColumnWriter; diff --git a/src/query/storages/fuse/src/io/write/stream/virtual_column_builder.rs b/src/query/storages/fuse/src/io/write/stream/virtual_column_builder.rs new file mode 100644 index 0000000000000..77bc5d7a6b544 --- /dev/null +++ b/src/query/storages/fuse/src/io/write/stream/virtual_column_builder.rs @@ -0,0 +1,202 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use databend_common_exception::Result; +use databend_common_expression::infer_schema_type; +use databend_common_expression::BlockEntry; +use databend_common_expression::DataBlock; +use databend_common_expression::TableField; +use databend_common_expression::TableSchemaRefExt; +use databend_common_expression::VIRTUAL_COLUMNS_LIMIT; +use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE; +use databend_storages_common_blocks::blocks_to_parquet; +use databend_storages_common_table_meta::meta::DraftVirtualBlockMeta; +use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::table::TableCompression; + +use crate::io::write::virtual_column_builder::VirtualColumnState; +use crate::io::TableMetaLocationGenerator; +use crate::io::VirtualColumnBuilder; +use crate::statistics::gen_columns_statistics; + +#[derive(Clone)] +pub struct VirtualColumnWriter { + table_compression: TableCompression, + variant_fields: Vec<(usize, TableField)>, + virtual_columns: Vec>, + input_rows: Vec, +} + +impl VirtualColumnWriter { + pub fn create(builder: VirtualColumnBuilder, table_compression: TableCompression) -> Self { + let filed_num = builder.variant_fields.len(); + let virtual_columns = (0..filed_num).map(|_| Vec::new()).collect(); + Self { + table_compression, + variant_fields: builder.variant_fields, + virtual_columns, + input_rows: vec![], + } + } + + pub fn add_block(&mut self, input: &DataBlock) -> Result<()> { + let num_rows = input.num_rows(); + self.input_rows.push(num_rows); + for ((offset, _), virtual_columns) in self + .variant_fields + .iter() + .zip(self.virtual_columns.iter_mut()) + { + let column = input.get_by_offset(*offset).clone(); + virtual_columns.push(column); + } + Ok(()) + } + + pub fn finalize(self, location: &Location) -> Result { + let total_rows: usize = self.input_rows.iter().sum(); + // use a tmp column id to generate statistics for virtual columns. + let mut tmp_column_id = 0; + let mut virtual_column_names = Vec::new(); + let mut virtual_fields = Vec::new(); + let mut virtual_columns = Vec::new(); + + 'FOR: for ((_, source_field), columns) in self + .variant_fields + .into_iter() + .zip(self.virtual_columns.into_iter()) + { + let source_column_id = source_field.column_id; + let virtual_field_num = virtual_fields.len(); + let mut virtual_values = BTreeMap::new(); + let mut sample_done = false; + + for (column, &num_rows) in columns.iter().zip(&self.input_rows) { + let mut start_pos = 0; + if !sample_done { + // use first 10 rows as sample to check whether the block is suitable for generating virtual columns + let sample_rows = num_rows.min(10); + for row in 0..sample_rows { + VirtualColumnBuilder::extract_virtual_values( + column, + row, + virtual_field_num, + &mut virtual_values, + ); + } + + if VirtualColumnBuilder::check_sample_virtual_values( + sample_rows, + &mut virtual_values, + ) { + continue 'FOR; + } + start_pos = sample_rows; + sample_done = true; + } + for row in start_pos..num_rows { + VirtualColumnBuilder::extract_virtual_values( + column, + row, + virtual_field_num, + &mut virtual_values, + ); + } + } + VirtualColumnBuilder::discard_virtual_values( + total_rows, + virtual_field_num, + &mut virtual_values, + ); + if virtual_values.is_empty() { + continue; + } + + let value_types = VirtualColumnBuilder::inference_data_type(&virtual_values); + for ((key_paths, vals), val_type) in + virtual_values.into_iter().zip(value_types.into_iter()) + { + let (virtual_type, column) = VirtualColumnBuilder::build_column(&val_type, vals); + let virtual_table_type = infer_schema_type(&virtual_type).unwrap(); + virtual_columns.push(column.into()); + + let key_name = VirtualColumnBuilder::key_path_to_string(key_paths); + let virtual_name = format!("{}{}", source_field.name, key_name); + + let virtual_field = TableField::new_from_column_id( + &virtual_name, + virtual_table_type, + tmp_column_id, + ); + virtual_fields.push(virtual_field); + tmp_column_id += 1; + + virtual_column_names.push((source_column_id, key_name, val_type)); + } + if virtual_fields.len() >= VIRTUAL_COLUMNS_LIMIT { + break; + } + } + + // There are no suitable virtual columns, returning empty data. + if virtual_fields.is_empty() { + let draft_virtual_block_meta = DraftVirtualBlockMeta { + virtual_column_metas: vec![], + virtual_column_size: 0, + virtual_location: ("".to_string(), 0), + }; + + return Ok(VirtualColumnState { + data: vec![], + draft_virtual_block_meta, + }); + } + + let virtual_block_schema = TableSchemaRefExt::create(virtual_fields); + let virtual_block = DataBlock::new(virtual_columns, total_rows); + + let columns_statistics = + gen_columns_statistics(&virtual_block, None, &virtual_block_schema)?; + + let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE); + let file_meta = blocks_to_parquet( + virtual_block_schema.as_ref(), + vec![virtual_block], + &mut data, + self.table_compression, + )?; + let draft_virtual_column_metas = VirtualColumnBuilder::file_meta_to_virtual_column_metas( + file_meta, + virtual_column_names, + columns_statistics, + )?; + + let data_size = data.len() as u64; + let virtual_column_location = + TableMetaLocationGenerator::gen_virtual_block_location(&location.0); + + let draft_virtual_block_meta = DraftVirtualBlockMeta { + virtual_column_metas: draft_virtual_column_metas, + virtual_column_size: data_size, + virtual_location: (virtual_column_location, 0), + }; + + Ok(VirtualColumnState { + data, + draft_virtual_block_meta, + }) + } +} From ae7ea9e60cf4585221747fbab6a203f683acf8d0 Mon Sep 17 00:00:00 2001 From: zhyass Date: Sun, 6 Jul 2025 01:04:07 +0800 Subject: [PATCH 07/12] fix test --- .../05_01_compact/05_01_02_load_compact_copy_row_per_block.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/suites/1_stateful/05_formats/05_01_compact/05_01_02_load_compact_copy_row_per_block.sh b/tests/suites/1_stateful/05_formats/05_01_compact/05_01_02_load_compact_copy_row_per_block.sh index 4d6cafcb184f9..862680eab447d 100755 --- a/tests/suites/1_stateful/05_formats/05_01_compact/05_01_02_load_compact_copy_row_per_block.sh +++ b/tests/suites/1_stateful/05_formats/05_01_compact/05_01_02_load_compact_copy_row_per_block.sh @@ -15,7 +15,7 @@ echo "drop table if exists t1 all" | $BENDSQL_CLIENT_CONNECT echo "CREATE TABLE t1 ( c0 string -) engine=fuse row_per_block=800; +) engine=fuse row_per_block=500; " | $BENDSQL_CLIENT_CONNECT From f8864ff3bbb1cfa807aa057a2a495c4376cea321 Mon Sep 17 00:00:00 2001 From: zhyass Date: Tue, 8 Jul 2025 02:20:55 +0800 Subject: [PATCH 08/12] fix --- .../fuse/src/io/write/stream/block_builder.rs | 36 +--- .../storages/fuse/src/io/write/stream/mod.rs | 2 - .../io/write/stream/virtual_column_builder.rs | 202 ------------------ 3 files changed, 3 insertions(+), 237 deletions(-) delete mode 100644 src/query/storages/fuse/src/io/write/stream/virtual_column_builder.rs diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index f4f03cd83caed..18262cbd83c25 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -55,7 +55,6 @@ use crate::io::create_inverted_index_builders; use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder; use crate::io::write::stream::cluster_statistics::ClusterStatisticsState; use crate::io::write::stream::ColumnStatisticsState; -use crate::io::write::stream::VirtualColumnWriter; use crate::io::write::InvertedIndexState; use crate::io::BlockSerialization; use crate::io::BloomIndexState; @@ -216,38 +215,17 @@ impl StreamBlockBuilder { &properties.ngram_args, )?; - let virtual_column_builder = if properties - .ctx - .get_settings() - .get_enable_refresh_virtual_column_after_write() - .unwrap_or_default() - && properties.support_virtual_columns - { - VirtualColumnBuilder::try_create( - properties.ctx.clone(), - properties.source_schema.clone(), - ) - .ok() - } else { - None - }; + let virtual_column_builder = properties.virtual_column_builder.clone(); let cluster_stats_state = ClusterStatisticsState::new(properties.cluster_stats_builder.clone()); let column_stats_state = ColumnStatisticsState::new(&properties.stats_columns, &properties.distinct_columns); - let virtual_column_writer = properties.virtual_column_builder.as_ref().map(|builder| { - VirtualColumnWriter::create( - builder.clone(), - properties.write_settings.table_compression, - ) - }); Ok(StreamBlockBuilder { properties, block_writer, inverted_index_writers, - virtual_column_writer, bloom_index_builder, virtual_column_builder, row_count: 0, @@ -342,12 +320,6 @@ impl StreamBlockBuilder { None }; - let virtual_column_state = if let Some(writer) = self.virtual_column_writer.take() { - Some(writer.finalize(&block_location)?) - } else { - None - }; - let col_metas = self.block_writer.finish(&self.properties.source_schema)?; let block_raw_data = mem::take(self.block_writer.inner_mut()); @@ -411,7 +383,6 @@ pub struct StreamBlockProperties { inverted_index_builders: Vec, virtual_column_builder: Option, table_meta_timestamps: TableMetaTimestamps, - support_virtual_columns: bool, } impl StreamBlockProperties { @@ -460,8 +431,9 @@ impl StreamBlockProperties { .get_settings() .get_enable_refresh_virtual_column_after_write() .unwrap_or_default() + && table.support_virtual_columns() { - VirtualColumnBuilder::try_create(ctx.clone(), table, source_schema.clone()).ok() + VirtualColumnBuilder::try_create(ctx.clone(), source_schema.clone()).ok() } else { None }; @@ -483,7 +455,6 @@ impl StreamBlockProperties { } } } - let support_virtual_columns = table.support_virtual_columns(); Ok(Arc::new(StreamBlockProperties { ctx, meta_locations: table.meta_location_generator().clone(), @@ -498,7 +469,6 @@ impl StreamBlockProperties { ngram_args, inverted_index_builders, table_meta_timestamps, - support_virtual_columns, })) } } diff --git a/src/query/storages/fuse/src/io/write/stream/mod.rs b/src/query/storages/fuse/src/io/write/stream/mod.rs index dbc883500f3d8..451b82e998fd0 100644 --- a/src/query/storages/fuse/src/io/write/stream/mod.rs +++ b/src/query/storages/fuse/src/io/write/stream/mod.rs @@ -17,7 +17,6 @@ mod cluster_statistics; mod column_ndv_estimator; mod column_statistics_builder; mod column_statistics_state; -mod virtual_column_builder; pub(crate) use block_builder::StreamBlockBuilder; pub(crate) use block_builder::StreamBlockProperties; @@ -27,4 +26,3 @@ pub(crate) use column_statistics_builder::create_column_stats_builder; pub(crate) use column_statistics_builder::ColumnStatisticsBuilder; pub(crate) use column_statistics_builder::ColumnStatsOps; pub(crate) use column_statistics_state::ColumnStatisticsState; -pub(crate) use virtual_column_builder::VirtualColumnWriter; diff --git a/src/query/storages/fuse/src/io/write/stream/virtual_column_builder.rs b/src/query/storages/fuse/src/io/write/stream/virtual_column_builder.rs deleted file mode 100644 index 77bc5d7a6b544..0000000000000 --- a/src/query/storages/fuse/src/io/write/stream/virtual_column_builder.rs +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; - -use databend_common_exception::Result; -use databend_common_expression::infer_schema_type; -use databend_common_expression::BlockEntry; -use databend_common_expression::DataBlock; -use databend_common_expression::TableField; -use databend_common_expression::TableSchemaRefExt; -use databend_common_expression::VIRTUAL_COLUMNS_LIMIT; -use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE; -use databend_storages_common_blocks::blocks_to_parquet; -use databend_storages_common_table_meta::meta::DraftVirtualBlockMeta; -use databend_storages_common_table_meta::meta::Location; -use databend_storages_common_table_meta::table::TableCompression; - -use crate::io::write::virtual_column_builder::VirtualColumnState; -use crate::io::TableMetaLocationGenerator; -use crate::io::VirtualColumnBuilder; -use crate::statistics::gen_columns_statistics; - -#[derive(Clone)] -pub struct VirtualColumnWriter { - table_compression: TableCompression, - variant_fields: Vec<(usize, TableField)>, - virtual_columns: Vec>, - input_rows: Vec, -} - -impl VirtualColumnWriter { - pub fn create(builder: VirtualColumnBuilder, table_compression: TableCompression) -> Self { - let filed_num = builder.variant_fields.len(); - let virtual_columns = (0..filed_num).map(|_| Vec::new()).collect(); - Self { - table_compression, - variant_fields: builder.variant_fields, - virtual_columns, - input_rows: vec![], - } - } - - pub fn add_block(&mut self, input: &DataBlock) -> Result<()> { - let num_rows = input.num_rows(); - self.input_rows.push(num_rows); - for ((offset, _), virtual_columns) in self - .variant_fields - .iter() - .zip(self.virtual_columns.iter_mut()) - { - let column = input.get_by_offset(*offset).clone(); - virtual_columns.push(column); - } - Ok(()) - } - - pub fn finalize(self, location: &Location) -> Result { - let total_rows: usize = self.input_rows.iter().sum(); - // use a tmp column id to generate statistics for virtual columns. - let mut tmp_column_id = 0; - let mut virtual_column_names = Vec::new(); - let mut virtual_fields = Vec::new(); - let mut virtual_columns = Vec::new(); - - 'FOR: for ((_, source_field), columns) in self - .variant_fields - .into_iter() - .zip(self.virtual_columns.into_iter()) - { - let source_column_id = source_field.column_id; - let virtual_field_num = virtual_fields.len(); - let mut virtual_values = BTreeMap::new(); - let mut sample_done = false; - - for (column, &num_rows) in columns.iter().zip(&self.input_rows) { - let mut start_pos = 0; - if !sample_done { - // use first 10 rows as sample to check whether the block is suitable for generating virtual columns - let sample_rows = num_rows.min(10); - for row in 0..sample_rows { - VirtualColumnBuilder::extract_virtual_values( - column, - row, - virtual_field_num, - &mut virtual_values, - ); - } - - if VirtualColumnBuilder::check_sample_virtual_values( - sample_rows, - &mut virtual_values, - ) { - continue 'FOR; - } - start_pos = sample_rows; - sample_done = true; - } - for row in start_pos..num_rows { - VirtualColumnBuilder::extract_virtual_values( - column, - row, - virtual_field_num, - &mut virtual_values, - ); - } - } - VirtualColumnBuilder::discard_virtual_values( - total_rows, - virtual_field_num, - &mut virtual_values, - ); - if virtual_values.is_empty() { - continue; - } - - let value_types = VirtualColumnBuilder::inference_data_type(&virtual_values); - for ((key_paths, vals), val_type) in - virtual_values.into_iter().zip(value_types.into_iter()) - { - let (virtual_type, column) = VirtualColumnBuilder::build_column(&val_type, vals); - let virtual_table_type = infer_schema_type(&virtual_type).unwrap(); - virtual_columns.push(column.into()); - - let key_name = VirtualColumnBuilder::key_path_to_string(key_paths); - let virtual_name = format!("{}{}", source_field.name, key_name); - - let virtual_field = TableField::new_from_column_id( - &virtual_name, - virtual_table_type, - tmp_column_id, - ); - virtual_fields.push(virtual_field); - tmp_column_id += 1; - - virtual_column_names.push((source_column_id, key_name, val_type)); - } - if virtual_fields.len() >= VIRTUAL_COLUMNS_LIMIT { - break; - } - } - - // There are no suitable virtual columns, returning empty data. - if virtual_fields.is_empty() { - let draft_virtual_block_meta = DraftVirtualBlockMeta { - virtual_column_metas: vec![], - virtual_column_size: 0, - virtual_location: ("".to_string(), 0), - }; - - return Ok(VirtualColumnState { - data: vec![], - draft_virtual_block_meta, - }); - } - - let virtual_block_schema = TableSchemaRefExt::create(virtual_fields); - let virtual_block = DataBlock::new(virtual_columns, total_rows); - - let columns_statistics = - gen_columns_statistics(&virtual_block, None, &virtual_block_schema)?; - - let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE); - let file_meta = blocks_to_parquet( - virtual_block_schema.as_ref(), - vec![virtual_block], - &mut data, - self.table_compression, - )?; - let draft_virtual_column_metas = VirtualColumnBuilder::file_meta_to_virtual_column_metas( - file_meta, - virtual_column_names, - columns_statistics, - )?; - - let data_size = data.len() as u64; - let virtual_column_location = - TableMetaLocationGenerator::gen_virtual_block_location(&location.0); - - let draft_virtual_block_meta = DraftVirtualBlockMeta { - virtual_column_metas: draft_virtual_column_metas, - virtual_column_size: data_size, - virtual_location: (virtual_column_location, 0), - }; - - Ok(VirtualColumnState { - data, - draft_virtual_block_meta, - }) - } -} From 41c78546583bc5f2ed5ee8c1e0450aa26f374647 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 9 Jul 2025 02:10:05 +0800 Subject: [PATCH 09/12] update --- .../io/write/stream/column_ndv_estimator.rs | 93 +++++++++++++++---- .../write/stream/column_statistics_state.rs | 3 +- .../storages/fuse/src/io/write/stream/mod.rs | 1 + 3 files changed, 76 insertions(+), 21 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs index cb138c53a1d8c..46f101a1e44c4 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs @@ -21,49 +21,102 @@ use databend_common_expression::types::DateType; use databend_common_expression::types::Decimal128Type; use databend_common_expression::types::Decimal256Type; use databend_common_expression::types::Decimal64Type; +use databend_common_expression::types::Float32Type; +use databend_common_expression::types::Float64Type; +use databend_common_expression::types::Int16Type; +use databend_common_expression::types::Int32Type; +use databend_common_expression::types::Int64Type; +use databend_common_expression::types::Int8Type; use databend_common_expression::types::NumberDataType; -use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; use databend_common_expression::types::TimestampType; +use databend_common_expression::types::UInt16Type; +use databend_common_expression::types::UInt32Type; +use databend_common_expression::types::UInt64Type; +use databend_common_expression::types::UInt8Type; use databend_common_expression::types::ValueType; -use databend_common_expression::with_number_mapped_type; use databend_common_expression::Column; use databend_common_expression::ScalarRef; use databend_common_expression::SELECTIVITY_THRESHOLD; use databend_storages_common_table_meta::meta::ColumnDistinctHLL; +use enum_dispatch::enum_dispatch; -pub trait ColumnNDVEstimator: Send + Sync { +#[enum_dispatch] +pub trait ColumnNDVEstimatorOps: Send + Sync { fn update_column(&mut self, column: &Column); fn update_scalar(&mut self, scalar: &ScalarRef); fn finalize(&self) -> u64; } -pub fn create_column_ndv_estimator(data_type: &DataType) -> Box { +#[enum_dispatch(ColumnNDVEstimatorOps)] +pub enum ColumnNDVEstimator { + Int8(ColumnNDVEstimatorImpl), + Int16(ColumnNDVEstimatorImpl), + Int32(ColumnNDVEstimatorImpl), + Int64(ColumnNDVEstimatorImpl), + UInt8(ColumnNDVEstimatorImpl), + UInt16(ColumnNDVEstimatorImpl), + UInt32(ColumnNDVEstimatorImpl), + UInt64(ColumnNDVEstimatorImpl), + Float32(ColumnNDVEstimatorImpl), + Float64(ColumnNDVEstimatorImpl), + String(ColumnNDVEstimatorImpl), + Date(ColumnNDVEstimatorImpl), + Timestamp(ColumnNDVEstimatorImpl), + Decimal64(ColumnNDVEstimatorImpl), + Decimal128(ColumnNDVEstimatorImpl), + Decimal256(ColumnNDVEstimatorImpl), +} + +pub fn create_column_ndv_estimator(data_type: &DataType) -> ColumnNDVEstimator { let inner_type = data_type.remove_nullable(); - with_number_mapped_type!(|NUM_TYPE| match inner_type { - DataType::Number(NumberDataType::NUM_TYPE) => { - ColumnNDVEstimatorImpl::>::create() + match inner_type { + DataType::Number(NumberDataType::Int8) => { + ColumnNDVEstimator::Int8(ColumnNDVEstimatorImpl::::new()) + } + DataType::Number(NumberDataType::Int16) => { + ColumnNDVEstimator::Int16(ColumnNDVEstimatorImpl::::new()) + } + DataType::Number(NumberDataType::Int32) => { + ColumnNDVEstimator::Int32(ColumnNDVEstimatorImpl::::new()) + } + DataType::Number(NumberDataType::Int64) => { + ColumnNDVEstimator::Int64(ColumnNDVEstimatorImpl::::new()) + } + DataType::Number(NumberDataType::UInt8) => { + ColumnNDVEstimator::UInt8(ColumnNDVEstimatorImpl::::new()) } - DataType::String => { - ColumnNDVEstimatorImpl::::create() + DataType::Number(NumberDataType::UInt16) => { + ColumnNDVEstimator::UInt16(ColumnNDVEstimatorImpl::::new()) } - DataType::Date => { - ColumnNDVEstimatorImpl::::create() + DataType::Number(NumberDataType::UInt32) => { + ColumnNDVEstimator::UInt32(ColumnNDVEstimatorImpl::::new()) } + DataType::Number(NumberDataType::UInt64) => { + ColumnNDVEstimator::UInt64(ColumnNDVEstimatorImpl::::new()) + } + DataType::Number(NumberDataType::Float32) => { + ColumnNDVEstimator::Float32(ColumnNDVEstimatorImpl::::new()) + } + DataType::Number(NumberDataType::Float64) => { + ColumnNDVEstimator::Float64(ColumnNDVEstimatorImpl::::new()) + } + DataType::String => ColumnNDVEstimator::String(ColumnNDVEstimatorImpl::::new()), + DataType::Date => ColumnNDVEstimator::Date(ColumnNDVEstimatorImpl::::new()), DataType::Timestamp => { - ColumnNDVEstimatorImpl::::create() + ColumnNDVEstimator::Timestamp(ColumnNDVEstimatorImpl::::new()) } DataType::Decimal(size) => { if size.can_carried_by_64() { - ColumnNDVEstimatorImpl::::create() + ColumnNDVEstimator::Decimal64(ColumnNDVEstimatorImpl::::new()) } else if size.can_carried_by_128() { - ColumnNDVEstimatorImpl::::create() + ColumnNDVEstimator::Decimal128(ColumnNDVEstimatorImpl::::new()) } else { - ColumnNDVEstimatorImpl::::create() + ColumnNDVEstimator::Decimal256(ColumnNDVEstimatorImpl::::new()) } } _ => unreachable!("Unsupported data type: {:?}", data_type), - }) + } } pub struct ColumnNDVEstimatorImpl @@ -80,15 +133,15 @@ where T: ValueType + Send + Sync, for<'a> T::ScalarRef<'a>: Hash, { - pub fn create() -> Box { - Box::new(Self { + pub fn new() -> Self { + Self { hll: ColumnDistinctHLL::new(), _phantom: Default::default(), - }) + } } } -impl ColumnNDVEstimator for ColumnNDVEstimatorImpl +impl ColumnNDVEstimatorOps for ColumnNDVEstimatorImpl where T: ValueType + Send + Sync, for<'a> T::ScalarRef<'a>: Hash, diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index cd295526a9241..6e7b5d0f87704 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -25,13 +25,14 @@ use databend_storages_common_table_meta::meta::StatisticsOfColumns; use crate::io::write::stream::create_column_ndv_estimator; use crate::io::write::stream::create_column_stats_builder; use crate::io::write::stream::ColumnNDVEstimator; +use crate::io::write::stream::ColumnNDVEstimatorOps; use crate::io::write::stream::ColumnStatisticsBuilder; use crate::io::write::stream::ColumnStatsOps; use crate::statistics::traverse_values_dfs; pub struct ColumnStatisticsState { col_stats: HashMap, - distinct_columns: HashMap>, + distinct_columns: HashMap, } impl ColumnStatisticsState { diff --git a/src/query/storages/fuse/src/io/write/stream/mod.rs b/src/query/storages/fuse/src/io/write/stream/mod.rs index 451b82e998fd0..f0c7365b5ba01 100644 --- a/src/query/storages/fuse/src/io/write/stream/mod.rs +++ b/src/query/storages/fuse/src/io/write/stream/mod.rs @@ -22,6 +22,7 @@ pub(crate) use block_builder::StreamBlockBuilder; pub(crate) use block_builder::StreamBlockProperties; pub(crate) use column_ndv_estimator::create_column_ndv_estimator; pub(crate) use column_ndv_estimator::ColumnNDVEstimator; +pub(crate) use column_ndv_estimator::ColumnNDVEstimatorOps; pub(crate) use column_statistics_builder::create_column_stats_builder; pub(crate) use column_statistics_builder::ColumnStatisticsBuilder; pub(crate) use column_statistics_builder::ColumnStatsOps; From c85b7b4a55705c2fbd9955c2c150ea6ca799ebe8 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 9 Jul 2025 03:23:33 +0800 Subject: [PATCH 10/12] simple --- Cargo.lock | 1 + src/query/storages/fuse/Cargo.toml | 1 + .../io/write/stream/column_ndv_estimator.rs | 44 +++++++------------ .../write/stream/column_statistics_builder.rs | 44 +++++++------------ 4 files changed, 32 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8eebaad296bb2..134b4890e7cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4351,6 +4351,7 @@ dependencies = [ "opendal", "parking_lot 0.12.3", "parquet", + "paste", "rand 0.8.5", "serde", "serde_json", diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 1e704da63bad9..0601548e6e4bf 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -62,6 +62,7 @@ match-template = { workspace = true } opendal = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true } +paste = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs index 46f101a1e44c4..4410f06feba42 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs @@ -35,6 +35,7 @@ use databend_common_expression::types::UInt32Type; use databend_common_expression::types::UInt64Type; use databend_common_expression::types::UInt8Type; use databend_common_expression::types::ValueType; +use databend_common_expression::with_number_type; use databend_common_expression::Column; use databend_common_expression::ScalarRef; use databend_common_expression::SELECTIVITY_THRESHOLD; @@ -69,37 +70,22 @@ pub enum ColumnNDVEstimator { } pub fn create_column_ndv_estimator(data_type: &DataType) -> ColumnNDVEstimator { + macro_rules! match_number_type_create { + ($inner_type:expr) => {{ + with_number_type!(|NUM_TYPE| match $inner_type { + NumberDataType::NUM_TYPE => { + paste::paste! { + ColumnNDVEstimator::NUM_TYPE(ColumnNDVEstimatorImpl::<[]>::new()) + } + } + }) + }}; + } + let inner_type = data_type.remove_nullable(); match inner_type { - DataType::Number(NumberDataType::Int8) => { - ColumnNDVEstimator::Int8(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::Int16) => { - ColumnNDVEstimator::Int16(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::Int32) => { - ColumnNDVEstimator::Int32(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::Int64) => { - ColumnNDVEstimator::Int64(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::UInt8) => { - ColumnNDVEstimator::UInt8(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::UInt16) => { - ColumnNDVEstimator::UInt16(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::UInt32) => { - ColumnNDVEstimator::UInt32(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::UInt64) => { - ColumnNDVEstimator::UInt64(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::Float32) => { - ColumnNDVEstimator::Float32(ColumnNDVEstimatorImpl::::new()) - } - DataType::Number(NumberDataType::Float64) => { - ColumnNDVEstimator::Float64(ColumnNDVEstimatorImpl::::new()) + DataType::Number(num_type) => { + match_number_type_create!(num_type) } DataType::String => ColumnNDVEstimator::String(ColumnNDVEstimatorImpl::::new()), DataType::Date => ColumnNDVEstimator::Date(ColumnNDVEstimatorImpl::::new()), diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs index b56f2a96f2e6a..04a734a675d25 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs @@ -38,6 +38,7 @@ use databend_common_expression::types::UInt32Type; use databend_common_expression::types::UInt64Type; use databend_common_expression::types::UInt8Type; use databend_common_expression::types::ValueType; +use databend_common_expression::with_number_type; use databend_common_expression::Column; use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; @@ -108,36 +109,21 @@ macro_rules! create_builder_for_type { pub fn create_column_stats_builder(data_type: &DataType) -> ColumnStatisticsBuilder { let inner_type = data_type.remove_nullable(); + macro_rules! match_number_type_create { + ($inner_type:expr) => {{ + with_number_type!(|NUM_TYPE| match $inner_type { + NumberDataType::NUM_TYPE => { + paste::paste! { + ColumnStatisticsBuilder::NUM_TYPE(CommonBuilder::<[]>::create(inner_type)) + } + } + }) + }}; + } + match inner_type { - DataType::Number(NumberDataType::Int8) => { - create_builder_for_type!(inner_type, Int8, Int8Type) - } - DataType::Number(NumberDataType::Int16) => { - create_builder_for_type!(inner_type, Int16, Int16Type) - } - DataType::Number(NumberDataType::Int32) => { - create_builder_for_type!(inner_type, Int32, Int32Type) - } - DataType::Number(NumberDataType::Int64) => { - create_builder_for_type!(inner_type, Int64, Int64Type) - } - DataType::Number(NumberDataType::UInt8) => { - create_builder_for_type!(inner_type, UInt8, UInt8Type) - } - DataType::Number(NumberDataType::UInt16) => { - create_builder_for_type!(inner_type, UInt16, UInt16Type) - } - DataType::Number(NumberDataType::UInt32) => { - create_builder_for_type!(inner_type, UInt32, UInt32Type) - } - DataType::Number(NumberDataType::UInt64) => { - create_builder_for_type!(inner_type, UInt64, UInt64Type) - } - DataType::Number(NumberDataType::Float32) => { - create_builder_for_type!(inner_type, Float32, Float32Type) - } - DataType::Number(NumberDataType::Float64) => { - create_builder_for_type!(inner_type, Float64, Float64Type) + DataType::Number(num_type) => { + match_number_type_create!(num_type) } DataType::String => create_builder_for_type!(inner_type, String, StringType), DataType::Date => create_builder_for_type!(inner_type, Date, DateType), From d4354da9566a060c14649a99b2223b84b030ccf6 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 9 Jul 2025 03:25:35 +0800 Subject: [PATCH 11/12] fix --- .../suites/ee/01_ee_system/01_0002_virtual_column.test | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test index 0d86d3d55b737..cd693939daffa 100644 --- a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test +++ b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test @@ -613,9 +613,6 @@ S001 ST001 A Excellent Y S002 ST002 B Good Y S003 ST003 C Average N -statement ok -set enable_block_stream_write = 1 - statement ok CREATE OR REPLACE TABLE test_stream ( id INT, @@ -670,9 +667,6 @@ FROM test_stream; 9 "Richard" 33 "Austin" "hiking" "cycling" 10 "Lisa" 26 "Chicago" "gaming" "reading" -statement ok -set enable_block_stream_write = 0 - statement ok set enable_experimental_virtual_column = 0; From 5680157c22bdac2b3b56aedf5755161554cc7777 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 9 Jul 2025 03:29:43 +0800 Subject: [PATCH 12/12] update --- .../write/stream/column_statistics_builder.rs | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs index 04a734a675d25..e9278b4b1e71a 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs @@ -98,15 +98,6 @@ where } } -macro_rules! create_builder_for_type { - ($data_type:expr, $variant:ident, $type:ty) => { - ColumnStatisticsBuilder::$variant(CommonBuilder::<$type>::create($data_type)) - }; - ($data_type:expr, $variant:ident, $type:ty, decimal) => { - ColumnStatisticsBuilder::$variant(DecimalBuilder::<$type>::create($data_type)) - }; -} - pub fn create_column_stats_builder(data_type: &DataType) -> ColumnStatisticsBuilder { let inner_type = data_type.remove_nullable(); macro_rules! match_number_type_create { @@ -125,16 +116,28 @@ pub fn create_column_stats_builder(data_type: &DataType) -> ColumnStatisticsBuil DataType::Number(num_type) => { match_number_type_create!(num_type) } - DataType::String => create_builder_for_type!(inner_type, String, StringType), - DataType::Date => create_builder_for_type!(inner_type, Date, DateType), - DataType::Timestamp => create_builder_for_type!(inner_type, Timestamp, TimestampType), + DataType::String => { + ColumnStatisticsBuilder::String(CommonBuilder::::create(inner_type)) + } + DataType::Date => { + ColumnStatisticsBuilder::Date(CommonBuilder::::create(inner_type)) + } + DataType::Timestamp => { + ColumnStatisticsBuilder::Timestamp(CommonBuilder::::create(inner_type)) + } DataType::Decimal(size) => { if size.can_carried_by_64() { - create_builder_for_type!(inner_type, Decimal64, Decimal64Type, decimal) + ColumnStatisticsBuilder::Decimal64(DecimalBuilder::::create( + inner_type, + )) } else if size.can_carried_by_128() { - create_builder_for_type!(inner_type, Decimal128, Decimal128Type, decimal) + ColumnStatisticsBuilder::Decimal128(DecimalBuilder::::create( + inner_type, + )) } else { - create_builder_for_type!(inner_type, Decimal256, Decimal256Type, decimal) + ColumnStatisticsBuilder::Decimal256(DecimalBuilder::::create( + inner_type, + )) } } _ => unreachable!("Unsupported data type: {:?}", data_type),