diff --git a/Cargo.lock b/Cargo.lock index 315cd64e0e5e5..134b4890e7cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4339,6 +4339,7 @@ dependencies = [ "databend-storages-common-table-meta", "divan", "enum-as-inner", + "enum_dispatch", "fastrace", "futures", "futures-util", @@ -4350,6 +4351,7 @@ dependencies = [ "opendal", "parking_lot 0.12.3", "parquet", + "paste", "rand 0.8.5", "serde", "serde_json", diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 9b32bbbc233fd..c1c982a7af915 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1319,13 +1319,6 @@ impl DefaultSettings { scope: SettingScope::Both, range: None, }), - ("enable_block_stream_write", DefaultSettingValue { - value: UserSettingValue::UInt64(0), - desc: "Enables block stream write", - mode: SettingMode::Both, - scope: SettingScope::Both, - range: Some(SettingRange::Numeric(0..=1)), - }), ("trace_sample_rate", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Setting the trace sample rate. The value should be between '0' and '100'", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0e9a7617a1ac9..14f8c822ce990 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -978,10 +978,6 @@ impl Settings { self.set_setting("optimizer_skip_list".to_string(), v) } - pub fn get_enable_block_stream_write(&self) -> Result { - Ok(self.try_get_u64("enable_block_stream_write")? == 1) - } - pub fn get_statement_queue_ttl_in_seconds(&self) -> Result { self.try_get_u64("statement_queue_ttl_in_seconds") } diff --git a/src/query/storages/common/index/src/bloom_index.rs b/src/query/storages/common/index/src/bloom_index.rs index bf5ec35035e64..59d06f6298a49 100644 --- a/src/query/storages/common/index/src/bloom_index.rs +++ b/src/query/storages/common/index/src/bloom_index.rs @@ -14,6 +14,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; +use std::hash::DefaultHasher; use std::hash::Hasher; use std::ops::ControlFlow; use std::ops::Deref; @@ -35,12 +36,18 @@ use databend_common_expression::types::BinaryType; use databend_common_expression::types::Bitmap; use databend_common_expression::types::Buffer; use databend_common_expression::types::DataType; +use databend_common_expression::types::DateType; use databend_common_expression::types::MapType; use databend_common_expression::types::NullableType; use databend_common_expression::types::Number; use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; use databend_common_expression::types::UInt64Type; +use databend_common_expression::types::ValueType; use databend_common_expression::visit_expr; +use databend_common_expression::with_number_mapped_type; use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; @@ -349,6 +356,71 @@ impl BloomIndex { Ok(column) } + pub fn calculate_digest_by_type(data_type: &DataType, column: &Column) -> Result> { + let inner_type = data_type.remove_nullable(); + with_number_mapped_type!(|NUM_TYPE| match inner_type { + DataType::Number(NumberDataType::NUM_TYPE) => { + Self::calculate_nullable_column_digests::>(column) + } + DataType::String => { + Self::calculate_nullable_column_digests::(column) + } + DataType::Date => { + Self::calculate_nullable_column_digests::(column) + } + DataType::Timestamp => { + Self::calculate_nullable_column_digests::(column) + } + _ => Err(ErrorCode::Internal(format!( + "Unsupported data type: {:?}", + data_type + ))), + }) + } + + #[inline(always)] + fn hash_one(v: &T) -> u64 { + let mut hasher = DefaultHasher::default(); + DFHash::hash(v, &mut hasher); + hasher.finish() + } + + fn calculate_nullable_column_digests(column: &Column) -> Result> + where for<'a> T::ScalarRef<'a>: DFHash { + let (column, validity) = if let Column::Nullable(box inner) = column { + let validity = if inner.validity.null_count() == 0 { + None + } else { + Some(&inner.validity) + }; + (&inner.column, validity) + } else { + (column, None) + }; + + let capacity = validity.map_or(column.len(), |v| v.true_count() + 1); + let mut result = Vec::with_capacity(capacity); + if validity.is_some() { + result.push(0); + } + let column = T::try_downcast_column(column).unwrap(); + if let Some(validity) = validity { + let column_iter = T::iter_column(&column); + let value_iter = column_iter + .zip(validity.iter()) + .filter(|(_, v)| *v) + .map(|(v, _)| v); + for value in value_iter { + result.push(Self::hash_one(&value)); + } + } else { + for value in T::iter_column(&column) { + result.push(Self::hash_one(&value)); + } + } + Ok(result) + } + /// calculate digest for column that may have null values /// /// returns (column, validity) where column is the digest of the column @@ -734,24 +806,8 @@ impl BloomIndexBuilder { } }; - let (column, validity) = - BloomIndex::calculate_nullable_column_digest(&self.func_ctx, &column, &data_type)?; - // create filter per column - if validity.as_ref().map(|v| v.null_count()).unwrap_or(0) > 0 { - let validity = validity.unwrap(); - let it = column.deref().iter().zip(validity.iter()).map( - |(v, b)| { - if !b { - &0 - } else { - v - } - }, - ); - index_column.builder.add_digests(it); - } else { - index_column.builder.add_digests(column.deref()); - } + let column = BloomIndex::calculate_digest_by_type(&data_type, &column)?; + index_column.builder.add_digests(column.deref()); } for index_column in self.ngram_columns.iter_mut() { let field_type = &block.data_type(index_column.index); diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 811821ec8088d..0601548e6e4bf 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -50,6 +50,7 @@ backoff = { workspace = true, features = ["futures", "tokio"] } bytes = { workspace = true } chrono = { workspace = true } enum-as-inner = { workspace = true } +enum_dispatch = { workspace = true } fastrace = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } @@ -61,6 +62,7 @@ match-template = { workspace = true } opendal = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true } +paste = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 96c17992f4141..b99db259c2f39 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -745,6 +745,13 @@ impl FuseTable { ) }) } + + pub fn enable_stream_block_write(&self) -> bool { + matches!(self.storage_format, FuseStorageFormat::Parquet) + && self + .cluster_type() + .is_none_or(|v| matches!(v, ClusterType::Hilbert)) + } } #[async_trait::async_trait] diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index c6593a248c8af..ade34fd17609f 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -20,6 +20,8 @@ use std::time::Instant; use chrono::Utc; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; use databend_common_expression::Column; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; @@ -124,6 +126,7 @@ pub async fn write_data(data: Vec, data_accessor: &Operator, location: &str) Ok(()) } +#[derive(Debug)] pub struct BlockSerialization { pub block_raw_data: Vec, pub block_meta: BlockMeta, @@ -132,6 +135,11 @@ pub struct BlockSerialization { pub virtual_column_state: Option, } +local_block_meta_serde!(BlockSerialization); + +#[typetag::serde(name = "block_serialization_meta")] +impl BlockMetaInfo for BlockSerialization {} + #[derive(Clone)] pub struct BlockBuilder { pub ctx: Arc, diff --git a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs index ec49070a6f08f..738c33ac2f2c3 100644 --- a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs @@ -40,6 +40,7 @@ use opendal::Operator; use crate::io::BlockReader; use crate::FuseStorageFormat; +#[derive(Debug)] pub struct BloomIndexState { pub(crate) data: Vec, pub(crate) size: u64, diff --git a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs index 74377a86108cb..8cf0b5f2355f0 100644 --- a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs @@ -121,6 +121,7 @@ pub fn create_inverted_index_builders(table_meta: &TableMeta) -> Vec, pub(crate) size: u64, diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 7193d988952e8..18262cbd83c25 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -35,6 +35,8 @@ use databend_common_expression::TableSchemaRef; use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; use databend_common_native::write::NativeWriter; +use databend_common_native::write::WriteOptions; +use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_index::BloomIndex; use databend_storages_common_index::BloomIndexBuilder; use databend_storages_common_index::Index; @@ -52,7 +54,7 @@ use parquet::file::properties::WriterProperties; use crate::io::create_inverted_index_builders; use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder; use crate::io::write::stream::cluster_statistics::ClusterStatisticsState; -use crate::io::write::stream::column_statistics::ColumnStatisticsState; +use crate::io::write::stream::ColumnStatisticsState; use crate::io::write::InvertedIndexState; use crate::io::BlockSerialization; use crate::io::BloomIndexState; @@ -188,7 +190,7 @@ impl StreamBlockBuilder { let writer = NativeWriter::new( buffer, properties.source_schema.as_ref().clone(), - databend_common_native::write::WriteOptions { + WriteOptions { default_compression: properties.write_settings.table_compression.into(), max_page_size: Some(properties.write_settings.max_page_size), default_compress_ratio, @@ -213,21 +215,7 @@ impl StreamBlockBuilder { &properties.ngram_args, )?; - let virtual_column_builder = if properties - .ctx - .get_settings() - .get_enable_refresh_virtual_column_after_write() - .unwrap_or_default() - && properties.support_virtual_columns - { - VirtualColumnBuilder::try_create( - properties.ctx.clone(), - properties.source_schema.clone(), - ) - .ok() - } else { - None - }; + let virtual_column_builder = properties.virtual_column_builder.clone(); let cluster_stats_state = ClusterStatisticsState::new(properties.cluster_stats_builder.clone()); @@ -254,7 +242,7 @@ impl StreamBlockBuilder { pub fn need_flush(&self) -> bool { let file_size = self.block_writer.compressed_size(); self.row_count >= self.properties.block_thresholds.min_rows_per_block - || self.block_size >= self.properties.block_thresholds.max_bytes_per_block + || self.block_size >= self.properties.block_thresholds.min_bytes_per_block * 2 || (file_size >= self.properties.block_thresholds.min_compressed_per_block && self.block_size >= self.properties.block_thresholds.min_bytes_per_block) } @@ -362,7 +350,10 @@ impl StreamBlockBuilder { compression: self.properties.write_settings.table_compression.into(), inverted_index_size, create_on: Some(Utc::now()), - ngram_filter_index_size: None, + ngram_filter_index_size: bloom_index_state + .as_ref() + .map(|v| v.ngram_size) + .unwrap_or_default(), virtual_block_meta: None, }; let serialized = BlockSerialization { @@ -385,29 +376,36 @@ pub struct StreamBlockProperties { source_schema: TableSchemaRef, cluster_stats_builder: Arc, - stats_columns: Vec, - distinct_columns: Vec, + stats_columns: Vec<(ColumnId, DataType)>, + distinct_columns: Vec<(ColumnId, DataType)>, bloom_columns_map: BTreeMap, ngram_args: Vec, inverted_index_builders: Vec, + virtual_column_builder: Option, table_meta_timestamps: TableMetaTimestamps, - support_virtual_columns: bool, } impl StreamBlockProperties { pub fn try_create( ctx: Arc, table: &FuseTable, + kind: MutationKind, table_meta_timestamps: TableMetaTimestamps, ) -> Result> { // remove virtual computed fields. - let fields = table + let mut fields = table .schema() .fields() .iter() .filter(|f| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_)))) .cloned() .collect::>(); + if !matches!(kind, MutationKind::Insert | MutationKind::Replace) { + // add stream fields. + for stream_column in table.stream_columns().iter() { + fields.push(stream_column.table_field()); + } + } let source_schema = Arc::new(TableSchema { fields, @@ -429,6 +427,16 @@ impl StreamBlockProperties { .collect::>(); let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta); + let virtual_column_builder = if ctx + .get_settings() + .get_enable_refresh_virtual_column_after_write() + .unwrap_or_default() + && table.support_virtual_columns() + { + VirtualColumnBuilder::try_create(ctx.clone(), source_schema.clone()).ok() + } else { + None + }; let cluster_stats_builder = ClusterStatisticsBuilder::try_create(table, ctx.clone(), &source_schema)?; @@ -438,16 +446,15 @@ impl StreamBlockProperties { let leaf_fields = source_schema.leaf_fields(); for field in leaf_fields.iter() { let column_id = field.column_id(); - if RangeIndex::supported_type(&DataType::from(field.data_type())) - && column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID + let data_type = DataType::from(field.data_type()); + if RangeIndex::supported_type(&data_type) && column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID { - stats_columns.push(column_id); + stats_columns.push((column_id, data_type.clone())); if !bloom_column_ids.contains(&column_id) { - distinct_columns.push(column_id); + distinct_columns.push((column_id, data_type)); } } } - let support_virtual_columns = table.support_virtual_columns(); Ok(Arc::new(StreamBlockProperties { ctx, meta_locations: table.meta_location_generator().clone(), @@ -455,13 +462,13 @@ impl StreamBlockProperties { source_schema, write_settings, cluster_stats_builder, + virtual_column_builder, stats_columns, distinct_columns, bloom_columns_map, ngram_args, inverted_index_builders, table_meta_timestamps, - support_virtual_columns, })) } } diff --git a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs new file mode 100644 index 0000000000000..4410f06feba42 --- /dev/null +++ b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs @@ -0,0 +1,182 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::Hash; +use std::marker::PhantomData; + +use databend_common_expression::types::boolean::TrueIdxIter; +use databend_common_expression::types::DataType; +use databend_common_expression::types::DateType; +use databend_common_expression::types::Decimal128Type; +use databend_common_expression::types::Decimal256Type; +use databend_common_expression::types::Decimal64Type; +use databend_common_expression::types::Float32Type; +use databend_common_expression::types::Float64Type; +use databend_common_expression::types::Int16Type; +use databend_common_expression::types::Int32Type; +use databend_common_expression::types::Int64Type; +use databend_common_expression::types::Int8Type; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::types::UInt16Type; +use databend_common_expression::types::UInt32Type; +use databend_common_expression::types::UInt64Type; +use databend_common_expression::types::UInt8Type; +use databend_common_expression::types::ValueType; +use databend_common_expression::with_number_type; +use databend_common_expression::Column; +use databend_common_expression::ScalarRef; +use databend_common_expression::SELECTIVITY_THRESHOLD; +use databend_storages_common_table_meta::meta::ColumnDistinctHLL; +use enum_dispatch::enum_dispatch; + +#[enum_dispatch] +pub trait ColumnNDVEstimatorOps: Send + Sync { + fn update_column(&mut self, column: &Column); + fn update_scalar(&mut self, scalar: &ScalarRef); + fn finalize(&self) -> u64; +} + +#[enum_dispatch(ColumnNDVEstimatorOps)] +pub enum ColumnNDVEstimator { + Int8(ColumnNDVEstimatorImpl), + Int16(ColumnNDVEstimatorImpl), + Int32(ColumnNDVEstimatorImpl), + Int64(ColumnNDVEstimatorImpl), + UInt8(ColumnNDVEstimatorImpl), + UInt16(ColumnNDVEstimatorImpl), + UInt32(ColumnNDVEstimatorImpl), + UInt64(ColumnNDVEstimatorImpl), + Float32(ColumnNDVEstimatorImpl), + Float64(ColumnNDVEstimatorImpl), + String(ColumnNDVEstimatorImpl), + Date(ColumnNDVEstimatorImpl), + Timestamp(ColumnNDVEstimatorImpl), + Decimal64(ColumnNDVEstimatorImpl), + Decimal128(ColumnNDVEstimatorImpl), + Decimal256(ColumnNDVEstimatorImpl), +} + +pub fn create_column_ndv_estimator(data_type: &DataType) -> ColumnNDVEstimator { + macro_rules! match_number_type_create { + ($inner_type:expr) => {{ + with_number_type!(|NUM_TYPE| match $inner_type { + NumberDataType::NUM_TYPE => { + paste::paste! { + ColumnNDVEstimator::NUM_TYPE(ColumnNDVEstimatorImpl::<[]>::new()) + } + } + }) + }}; + } + + let inner_type = data_type.remove_nullable(); + match inner_type { + DataType::Number(num_type) => { + match_number_type_create!(num_type) + } + DataType::String => ColumnNDVEstimator::String(ColumnNDVEstimatorImpl::::new()), + DataType::Date => ColumnNDVEstimator::Date(ColumnNDVEstimatorImpl::::new()), + DataType::Timestamp => { + ColumnNDVEstimator::Timestamp(ColumnNDVEstimatorImpl::::new()) + } + DataType::Decimal(size) => { + if size.can_carried_by_64() { + ColumnNDVEstimator::Decimal64(ColumnNDVEstimatorImpl::::new()) + } else if size.can_carried_by_128() { + ColumnNDVEstimator::Decimal128(ColumnNDVEstimatorImpl::::new()) + } else { + ColumnNDVEstimator::Decimal256(ColumnNDVEstimatorImpl::::new()) + } + } + _ => unreachable!("Unsupported data type: {:?}", data_type), + } +} + +pub struct ColumnNDVEstimatorImpl +where + T: ValueType + Send + Sync, + for<'a> T::ScalarRef<'a>: Hash, +{ + hll: ColumnDistinctHLL, + _phantom: PhantomData, +} + +impl ColumnNDVEstimatorImpl +where + T: ValueType + Send + Sync, + for<'a> T::ScalarRef<'a>: Hash, +{ + pub fn new() -> Self { + Self { + hll: ColumnDistinctHLL::new(), + _phantom: Default::default(), + } + } +} + +impl ColumnNDVEstimatorOps for ColumnNDVEstimatorImpl +where + T: ValueType + Send + Sync, + for<'a> T::ScalarRef<'a>: Hash, +{ + fn update_column(&mut self, column: &Column) { + let (column, validity) = match column { + Column::Nullable(box inner) => { + let validity = if inner.validity.null_count() == 0 { + None + } else { + Some(&inner.validity) + }; + (&inner.column, validity) + } + Column::Null { .. } => return, + column => (column, None), + }; + + let column = T::try_downcast_column(column).unwrap(); + if let Some(v) = validity { + if v.true_count() as f64 / v.len() as f64 >= SELECTIVITY_THRESHOLD { + for (data, valid) in T::iter_column(&column).zip(v.iter()) { + if valid { + self.hll.add_object(&data); + } + } + } else { + TrueIdxIter::new(v.len(), Some(v)).for_each(|idx| { + let val = unsafe { T::index_column_unchecked(&column, idx) }; + self.hll.add_object(&val); + }) + } + } else { + for value in T::iter_column(&column) { + self.hll.add_object(&value); + } + } + } + + fn update_scalar(&mut self, scalar: &ScalarRef) { + if matches!(scalar, ScalarRef::Null) { + return; + } + + let val = T::try_downcast_scalar(scalar).unwrap(); + self.hll.add_object(&val); + } + + fn finalize(&self) -> u64 { + self.hll.count() as u64 + } +} diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics.rs deleted file mode 100644 index 402504e3a207e..0000000000000 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics.rs +++ /dev/null @@ -1,253 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use databend_common_exception::Result; -use databend_common_expression::types::AccessType; -use databend_common_expression::types::DataType; -use databend_common_expression::types::DateType; -use databend_common_expression::types::DecimalColumn; -use databend_common_expression::types::DecimalScalar; -use databend_common_expression::types::NumberDataType; -use databend_common_expression::types::NumberType; -use databend_common_expression::types::StringType; -use databend_common_expression::types::TimestampType; -use databend_common_expression::with_number_mapped_type; -use databend_common_expression::Column; -use databend_common_expression::ColumnId; -use databend_common_expression::DataBlock; -use databend_common_expression::Scalar; -use databend_common_expression::ScalarRef; -use databend_common_expression::TableSchemaRef; -use databend_common_expression::Value; -use databend_common_functions::aggregates::eval_aggr; -use databend_storages_common_table_meta::meta::ColumnDistinctHLL; -use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::StatisticsOfColumns; - -use crate::statistics::reducers::reduce_column_statistics; -use crate::statistics::traverse_values_dfs; -use crate::statistics::Trim; - -pub struct ColumnStatisticsState { - col_stats: HashMap>, - distinct_columns: HashMap, -} - -impl ColumnStatisticsState { - pub fn new(stats_columns: &[ColumnId], distinct_columns: &[ColumnId]) -> Self { - let col_stats = stats_columns - .iter() - .map(|&col_id| (col_id, Vec::new())) - .collect(); - - let distinct_columns = distinct_columns - .iter() - .map(|&col_id| (col_id, ColumnDistinctHLL::default())) - .collect(); - - Self { - col_stats, - distinct_columns, - } - } - - pub fn add_block(&mut self, schema: &TableSchemaRef, data_block: &DataBlock) -> Result<()> { - let rows = data_block.num_rows(); - let leaves = traverse_values_dfs(data_block.columns(), schema.fields())?; - for (column_id, col, data_type) in leaves { - match col { - Value::Scalar(s) => { - let unset_bits = if s == Scalar::Null { rows } else { 0 }; - // when we read it back from parquet, it is a Column instead of Scalar - let in_memory_size = s.as_ref().estimated_scalar_repeat_size(rows, &data_type); - let col_stats = ColumnStatistics::new( - s.clone(), - s.clone(), - unset_bits as u64, - in_memory_size as u64, - None, - ); - if let Some(hll) = self.distinct_columns.get_mut(&column_id) { - scalar_update_hll_cardinality(&s.as_ref(), &data_type, hll); - } - self.col_stats.get_mut(&column_id).unwrap().push(col_stats); - } - Value::Column(col) => { - // later, during the evaluation of expressions, name of field does not matter - let mut min = Scalar::Null; - let mut max = Scalar::Null; - - let (mins, _) = eval_aggr("min", vec![], &[col.clone().into()], rows, vec![])?; - if mins.len() > 0 { - min = if let Some(v) = mins.index(0) { - // safe upwrap. - v.to_owned().trim_min().unwrap() - } else { - self.col_stats.remove(&column_id); - continue; - } - } - - let (maxs, _) = eval_aggr("max", vec![], &[col.clone().into()], rows, vec![])?; - if maxs.len() > 0 { - max = if let Some(v) = maxs.index(0) { - if let Some(v) = v.to_owned().trim_max() { - v - } else { - self.col_stats.remove(&column_id); - continue; - } - } else { - self.col_stats.remove(&column_id); - continue; - } - } - - let (is_all_null, bitmap) = col.validity(); - let unset_bits = match (is_all_null, bitmap) { - (true, _) => rows, - (false, Some(bitmap)) => bitmap.null_count(), - (false, None) => 0, - }; - let in_memory_size = col.memory_size() as u64; - let col_stats = - ColumnStatistics::new(min, max, unset_bits as u64, in_memory_size, None); - self.col_stats.get_mut(&column_id).unwrap().push(col_stats); - - // use distinct count calculated by the xor hash function to avoid repetitive operation. - if let Some(hll) = self.distinct_columns.get_mut(&column_id) { - column_update_hll_cardinality(&col, &data_type, hll); - } - } - } - } - Ok(()) - } - - pub fn finalize( - self, - column_distinct_count: HashMap, - ) -> Result { - let mut statistics = StatisticsOfColumns::with_capacity(self.col_stats.len()); - for (id, stats) in &self.col_stats { - let mut col_stats = reduce_column_statistics(stats); - if let Some(count) = column_distinct_count.get(id) { - col_stats.distinct_of_values = Some(*count as u64); - } else if let Some(hll) = self.distinct_columns.get(id) { - col_stats.distinct_of_values = Some(hll.count() as u64); - } - statistics.insert(*id, col_stats); - } - Ok(statistics) - } -} - -fn column_update_hll_cardinality(col: &Column, ty: &DataType, hll: &mut ColumnDistinctHLL) { - if let DataType::Nullable(inner) = ty { - let col = col.as_nullable().unwrap(); - for (i, v) in col.validity.iter().enumerate() { - if v { - let scalar = col.column.index(i).unwrap(); - scalar_update_hll_cardinality(&scalar, inner, hll); - } - } - return; - } - - with_number_mapped_type!(|NUM_TYPE| match ty { - DataType::Number(NumberDataType::NUM_TYPE) => { - let col = NumberType::::try_downcast_column(col).unwrap(); - for v in col.iter() { - hll.add_object(v); - } - } - DataType::String => { - let col = StringType::try_downcast_column(col).unwrap(); - for v in col.iter() { - hll.add_object(&v); - } - } - DataType::Date => { - let col = DateType::try_downcast_column(col).unwrap(); - for v in col.iter() { - hll.add_object(v); - } - } - DataType::Timestamp => { - let col = TimestampType::try_downcast_column(col).unwrap(); - for v in col.iter() { - hll.add_object(v); - } - } - DataType::Decimal(_) => { - match col { - Column::Decimal(DecimalColumn::Decimal64(col, _)) => { - for v in col.iter() { - hll.add_object(v); - } - } - Column::Decimal(DecimalColumn::Decimal128(col, _)) => { - for v in col.iter() { - hll.add_object(v); - } - } - Column::Decimal(DecimalColumn::Decimal256(col, _)) => { - for v in col.iter() { - hll.add_object(v); - } - } - _ => unreachable!(), - }; - } - _ => unreachable!("Unsupported data type: {:?}", ty), - }); -} - -fn scalar_update_hll_cardinality(scalar: &ScalarRef, ty: &DataType, hll: &mut ColumnDistinctHLL) { - if matches!(scalar, ScalarRef::Null) { - return; - } - - let ty = ty.remove_nullable(); - - with_number_mapped_type!(|NUM_TYPE| match ty { - DataType::Number(NumberDataType::NUM_TYPE) => { - let val = NumberType::::try_downcast_scalar(scalar).unwrap(); - hll.add_object(&val); - } - DataType::String => { - let val = StringType::try_downcast_scalar(scalar).unwrap(); - hll.add_object(&val); - } - DataType::Date => { - let val = DateType::try_downcast_scalar(scalar).unwrap(); - hll.add_object(&val); - } - DataType::Timestamp => { - let val = TimestampType::try_downcast_scalar(scalar).unwrap(); - hll.add_object(&val); - } - DataType::Decimal(_) => { - match scalar { - ScalarRef::Decimal(DecimalScalar::Decimal64(v, _)) => hll.add_object(&v), - ScalarRef::Decimal(DecimalScalar::Decimal128(v, _)) => hll.add_object(&v), - ScalarRef::Decimal(DecimalScalar::Decimal256(v, _)) => hll.add_object(&v), - _ => unreachable!(), - } - } - _ => unreachable!("Unsupported data type: {:?}", ty), - }); -} diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs new file mode 100644 index 0000000000000..e9278b4b1e71a --- /dev/null +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs @@ -0,0 +1,357 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::marker::PhantomData; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::boolean::TrueIdxIter; +use databend_common_expression::types::DataType; +use databend_common_expression::types::DateType; +use databend_common_expression::types::Decimal; +use databend_common_expression::types::Decimal128Type; +use databend_common_expression::types::Decimal256Type; +use databend_common_expression::types::Decimal64Type; +use databend_common_expression::types::Float32Type; +use databend_common_expression::types::Float64Type; +use databend_common_expression::types::Int16Type; +use databend_common_expression::types::Int32Type; +use databend_common_expression::types::Int64Type; +use databend_common_expression::types::Int8Type; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::types::UInt16Type; +use databend_common_expression::types::UInt32Type; +use databend_common_expression::types::UInt64Type; +use databend_common_expression::types::UInt8Type; +use databend_common_expression::types::ValueType; +use databend_common_expression::with_number_type; +use databend_common_expression::Column; +use databend_common_expression::Scalar; +use databend_common_expression::ScalarRef; +use databend_common_expression::SELECTIVITY_THRESHOLD; +use databend_storages_common_table_meta::meta::ColumnStatistics; +use enum_dispatch::enum_dispatch; + +use crate::statistics::Trim; + +pub type CommonBuilder = GenericColumnStatisticsBuilder; +pub type DecimalBuilder = GenericColumnStatisticsBuilder; + +#[enum_dispatch(ColumnStatsOps)] +pub enum ColumnStatisticsBuilder { + Int8(CommonBuilder), + Int16(CommonBuilder), + Int32(CommonBuilder), + Int64(CommonBuilder), + UInt8(CommonBuilder), + UInt16(CommonBuilder), + UInt32(CommonBuilder), + UInt64(CommonBuilder), + Float32(CommonBuilder), + Float64(CommonBuilder), + String(CommonBuilder), + Date(CommonBuilder), + Timestamp(CommonBuilder), + Decimal64(DecimalBuilder), + Decimal128(DecimalBuilder), + Decimal256(DecimalBuilder), +} + +#[enum_dispatch] +pub trait ColumnStatsOps { + fn update_column(&mut self, column: &Column); + fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType); + fn finalize(self) -> Result; +} + +impl ColumnStatsOps for GenericColumnStatisticsBuilder +where + T: ValueType + Send + Sync, + T::Scalar: Send + Sync, + A: ColumnStatisticsAdapter + 'static, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + fn update_column(&mut self, column: &Column) { + GenericColumnStatisticsBuilder::update_column(self, column); + } + + fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType) { + GenericColumnStatisticsBuilder::update_scalar(self, scalar, num_rows, data_type); + } + + fn finalize(self) -> Result { + GenericColumnStatisticsBuilder::finalize(self) + } +} + +pub fn create_column_stats_builder(data_type: &DataType) -> ColumnStatisticsBuilder { + let inner_type = data_type.remove_nullable(); + macro_rules! match_number_type_create { + ($inner_type:expr) => {{ + with_number_type!(|NUM_TYPE| match $inner_type { + NumberDataType::NUM_TYPE => { + paste::paste! { + ColumnStatisticsBuilder::NUM_TYPE(CommonBuilder::<[]>::create(inner_type)) + } + } + }) + }}; + } + + match inner_type { + DataType::Number(num_type) => { + match_number_type_create!(num_type) + } + DataType::String => { + ColumnStatisticsBuilder::String(CommonBuilder::::create(inner_type)) + } + DataType::Date => { + ColumnStatisticsBuilder::Date(CommonBuilder::::create(inner_type)) + } + DataType::Timestamp => { + ColumnStatisticsBuilder::Timestamp(CommonBuilder::::create(inner_type)) + } + DataType::Decimal(size) => { + if size.can_carried_by_64() { + ColumnStatisticsBuilder::Decimal64(DecimalBuilder::::create( + inner_type, + )) + } else if size.can_carried_by_128() { + ColumnStatisticsBuilder::Decimal128(DecimalBuilder::::create( + inner_type, + )) + } else { + ColumnStatisticsBuilder::Decimal256(DecimalBuilder::::create( + inner_type, + )) + } + } + _ => unreachable!("Unsupported data type: {:?}", data_type), + } +} + +pub trait ColumnStatisticsAdapter: Send + Sync { + type Value: Clone + Send + Sync; + + fn scalar_to_value(val: T::ScalarRef<'_>) -> Self::Value; + + fn value_to_scalar(val: Self::Value) -> T::Scalar; + + fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering); +} + +pub struct CommonAdapter; + +impl ColumnStatisticsAdapter for CommonAdapter +where + T: ValueType, + T::Scalar: Send + Sync, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + type Value = T::Scalar; + + fn scalar_to_value(val: T::ScalarRef<'_>) -> Self::Value { + T::to_owned_scalar(val) + } + + fn value_to_scalar(val: Self::Value) -> T::Scalar { + val + } + + fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering) { + if scalar.partial_cmp(&T::to_scalar_ref(value)) == Some(ordering) { + *value = T::to_owned_scalar(scalar); + } + } +} + +pub struct DecimalAdapter; + +impl ColumnStatisticsAdapter for DecimalAdapter +where + T: ValueType, + T::Scalar: Decimal + Send + Sync, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + type Value = ::U64Array; + + fn scalar_to_value(val: T::ScalarRef<'_>) -> Self::Value { + T::Scalar::to_u64_array(T::to_owned_scalar(val)) + } + + fn value_to_scalar(val: Self::Value) -> T::Scalar { + T::Scalar::from_u64_array(val) + } + + fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering) { + let val = T::Scalar::from_u64_array(*value); + if scalar.partial_cmp(&T::to_scalar_ref(&val)) == Some(ordering) { + *value = T::Scalar::to_u64_array(T::to_owned_scalar(scalar)); + } + } +} + +pub struct GenericColumnStatisticsBuilder +where + T: ValueType, + A: ColumnStatisticsAdapter, +{ + min: Option, + max: Option, + null_count: usize, + in_memory_size: usize, + data_type: DataType, + + _phantom: PhantomData<(T, A)>, +} + +impl GenericColumnStatisticsBuilder +where + T: ValueType + Send + Sync, + T::Scalar: Send + Sync, + A: ColumnStatisticsAdapter + 'static, + for<'a, 'b> T::ScalarRef<'a>: PartialOrd>, +{ + fn create(data_type: DataType) -> Self { + Self { + min: None, + max: None, + null_count: 0, + in_memory_size: 0, + data_type, + _phantom: PhantomData, + } + } + + fn add_batch<'a, I>(&mut self, mut iter: I) + where I: Iterator> { + let first = iter.next().unwrap(); + let mut min = first.clone(); + let mut max = first; + for v in iter { + if matches!(min.partial_cmp(&v), Some(Ordering::Greater)) { + min = v; + continue; + } + + if matches!(max.partial_cmp(&v), Some(Ordering::Less)) { + max = v; + } + } + + self.add(min, max); + } + + fn add(&mut self, min: T::ScalarRef<'_>, max: T::ScalarRef<'_>) { + if let Some(val) = self.min.as_mut() { + A::update_value(val, min, Ordering::Less); + } else { + self.min = Some(A::scalar_to_value(min)); + } + + if let Some(val) = self.max.as_mut() { + A::update_value(val, max, Ordering::Greater); + } else { + self.max = Some(A::scalar_to_value(max)); + } + } + + fn update_column(&mut self, column: &Column) { + self.in_memory_size += column.memory_size(); + if column.len() == 0 { + return; + } + let (column, validity) = match column { + Column::Nullable(box inner) => { + let validity = if inner.validity.null_count() == 0 { + None + } else { + Some(&inner.validity) + }; + (&inner.column, validity) + } + Column::Null { len } => { + self.null_count += *len; + return; + } + col => (col, None), + }; + self.null_count += validity.map_or(0, |v| v.null_count()); + + let column = T::try_downcast_column(column).unwrap(); + if let Some(v) = validity { + if v.true_count() as f64 / v.len() as f64 >= SELECTIVITY_THRESHOLD { + let column_iter = T::iter_column(&column); + let value_iter = column_iter + .zip(v.iter()) + .filter(|(_, v)| *v) + .map(|(v, _)| v); + self.add_batch(value_iter); + } else { + for idx in TrueIdxIter::new(v.len(), Some(v)) { + let v = unsafe { T::index_column_unchecked(&column, idx) }; + self.add(v.clone(), v); + } + } + } else { + let column_iter = T::iter_column(&column); + self.add_batch(column_iter); + } + } + + fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType) { + // when we read it back from parquet, it is a Column instead of Scalar + self.in_memory_size += scalar.estimated_scalar_repeat_size(num_rows, data_type); + if scalar.is_null() { + self.null_count += num_rows; + return; + } + + let val = T::try_downcast_scalar(scalar).unwrap(); + self.add(val.clone(), val); + } + + fn finalize(self) -> Result { + let min = if let Some(v) = self.min { + let v = A::value_to_scalar(v); + // safe upwrap. + T::upcast_scalar_with_type(v, &self.data_type) + .trim_min() + .unwrap() + } else { + Scalar::Null + }; + let max = if let Some(v) = self.max { + let v = A::value_to_scalar(v); + if let Some(v) = T::upcast_scalar_with_type(v, &self.data_type).trim_max() { + v + } else { + return Err(ErrorCode::Internal("Unable to trim string")); + } + } else { + Scalar::Null + }; + + Ok(ColumnStatistics::new( + min, + max, + self.null_count as u64, + self.in_memory_size as u64, + None, + )) + } +} diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs new file mode 100644 index 0000000000000..6e7b5d0f87704 --- /dev/null +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -0,0 +1,118 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::ColumnId; +use databend_common_expression::DataBlock; +use databend_common_expression::TableSchemaRef; +use databend_common_expression::Value; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; + +use crate::io::write::stream::create_column_ndv_estimator; +use crate::io::write::stream::create_column_stats_builder; +use crate::io::write::stream::ColumnNDVEstimator; +use crate::io::write::stream::ColumnNDVEstimatorOps; +use crate::io::write::stream::ColumnStatisticsBuilder; +use crate::io::write::stream::ColumnStatsOps; +use crate::statistics::traverse_values_dfs; + +pub struct ColumnStatisticsState { + col_stats: HashMap, + distinct_columns: HashMap, +} + +impl ColumnStatisticsState { + pub fn new( + stats_columns: &[(ColumnId, DataType)], + distinct_columns: &[(ColumnId, DataType)], + ) -> Self { + let col_stats = stats_columns + .iter() + .map(|(col_id, data_type)| (*col_id, create_column_stats_builder(data_type))) + .collect(); + + let distinct_columns = distinct_columns + .iter() + .map(|(col_id, data_type)| (*col_id, create_column_ndv_estimator(data_type))) + .collect(); + + Self { + col_stats, + distinct_columns, + } + } + + pub fn add_block(&mut self, schema: &TableSchemaRef, data_block: &DataBlock) -> Result<()> { + let rows = data_block.num_rows(); + let leaves = traverse_values_dfs(data_block.columns(), schema.fields())?; + for (column_id, col, data_type) in leaves { + match col { + Value::Scalar(s) => { + self.col_stats.get_mut(&column_id).unwrap().update_scalar( + &s.as_ref(), + rows, + &data_type, + ); + if let Some(estimator) = self.distinct_columns.get_mut(&column_id) { + estimator.update_scalar(&s.as_ref()); + } + } + Value::Column(col) => { + self.col_stats + .get_mut(&column_id) + .unwrap() + .update_column(&col); + // use distinct count calculated by the xor hash function to avoid repetitive operation. + if let Some(estimator) = self.distinct_columns.get_mut(&column_id) { + estimator.update_column(&col); + } + } + } + } + Ok(()) + } + + pub fn finalize( + self, + column_distinct_count: HashMap, + ) -> Result { + let mut statistics = StatisticsOfColumns::with_capacity(self.col_stats.len()); + for (id, stats) in self.col_stats { + let mut col_stats = stats.finalize()?; + if let Some(count) = column_distinct_count.get(&id) { + // value calculated by xor hash function include NULL, need to subtract one. + let distinct_of_values = if col_stats.null_count > 0 { + *count as u64 - 1 + } else { + *count as u64 + }; + col_stats.distinct_of_values = Some(distinct_of_values); + } else if let Some(estimator) = self.distinct_columns.get(&id) { + col_stats.distinct_of_values = Some(estimator.finalize()); + } else if col_stats.min == col_stats.max { + // Bloom index will skip the large string column, it also no need to calc distinct values. + if col_stats.min.is_null() { + col_stats.distinct_of_values = Some(0); + } else { + col_stats.distinct_of_values = Some(1); + } + } + statistics.insert(id, col_stats); + } + Ok(statistics) + } +} diff --git a/src/query/storages/fuse/src/io/write/stream/mod.rs b/src/query/storages/fuse/src/io/write/stream/mod.rs index 26d32ee679582..f0c7365b5ba01 100644 --- a/src/query/storages/fuse/src/io/write/stream/mod.rs +++ b/src/query/storages/fuse/src/io/write/stream/mod.rs @@ -14,7 +14,16 @@ mod block_builder; mod cluster_statistics; -mod column_statistics; +mod column_ndv_estimator; +mod column_statistics_builder; +mod column_statistics_state; pub(crate) use block_builder::StreamBlockBuilder; pub(crate) use block_builder::StreamBlockProperties; +pub(crate) use column_ndv_estimator::create_column_ndv_estimator; +pub(crate) use column_ndv_estimator::ColumnNDVEstimator; +pub(crate) use column_ndv_estimator::ColumnNDVEstimatorOps; +pub(crate) use column_statistics_builder::create_column_stats_builder; +pub(crate) use column_statistics_builder::ColumnStatisticsBuilder; +pub(crate) use column_statistics_builder::ColumnStatsOps; +pub(crate) use column_statistics_state::ColumnStatisticsState; diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index 9316374128528..d1ca44fd41d4c 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -37,10 +37,11 @@ use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::table::ClusterType; +use crate::io::StreamBlockProperties; +use crate::operations::TransformBlockBuilder; use crate::operations::TransformBlockWriter; use crate::operations::TransformSerializeBlock; use crate::statistics::ClusterStatsGenerator; -use crate::FuseStorageFormat; use crate::FuseTable; impl FuseTable { @@ -50,19 +51,22 @@ impl FuseTable { pipeline: &mut Pipeline, table_meta_timestamps: TableMetaTimestamps, ) -> Result<()> { - let enable_stream_block_write = ctx.get_settings().get_enable_block_stream_write()? - && matches!(self.storage_format, FuseStorageFormat::Parquet); + let enable_stream_block_write = self.enable_stream_block_write(); if enable_stream_block_write { + let properties = StreamBlockProperties::try_create( + ctx.clone(), + self, + MutationKind::Insert, + table_meta_timestamps, + )?; + pipeline.add_transform(|input, output| { - TransformBlockWriter::try_create( - ctx.clone(), - input, - output, - self, - table_meta_timestamps, - false, - ) + TransformBlockBuilder::try_create(input, output, properties.clone()) })?; + + pipeline.add_async_accumulating_transformer(|| { + TransformBlockWriter::create(ctx.clone(), MutationKind::Insert, self, false) + }); } else { let block_thresholds = self.get_block_thresholds(); build_compact_block_pipeline(pipeline, block_thresholds)?; diff --git a/src/query/storages/fuse/src/operations/common/processors/mod.rs b/src/query/storages/fuse/src/operations/common/processors/mod.rs index e0e3d3b25f25a..d43c569c14016 100644 --- a/src/query/storages/fuse/src/operations/common/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/common/processors/mod.rs @@ -22,6 +22,7 @@ mod transform_serialize_segment; pub use multi_table_insert_commit::CommitMultiTableInsert; pub use sink_commit::CommitSink; +pub use transform_block_writer::TransformBlockBuilder; pub use transform_block_writer::TransformBlockWriter; pub use transform_merge_commit_meta::TransformMergeCommitMeta; pub use transform_mutation_aggregator::TableMutationAggregator; diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs b/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs index ce11a4834c7fb..895f5c7a0ebc5 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs @@ -22,35 +22,36 @@ use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; -use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT; +use databend_common_metrics::storage::metrics_inc_recluster_write_block_nums; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::AsyncAccumulatingTransform; +use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_storage::MutationStatus; -use databend_storages_common_table_meta::meta::TableMetaTimestamps; use opendal::Operator; use crate::io::BlockSerialization; use crate::io::BlockWriter; use crate::io::StreamBlockBuilder; use crate::io::StreamBlockProperties; +use crate::operations::MutationLogEntry; +use crate::operations::MutationLogs; use crate::FuseTable; -use crate::FUSE_OPT_KEY_ROW_PER_BLOCK; -#[allow(clippy::large_enum_variant)] enum State { Consume, Collect(DataBlock), Serialize, Finalize, Flush, - Write(BlockSerialization), } -pub struct TransformBlockWriter { +pub struct TransformBlockBuilder { state: State, input: Arc, output: Arc, @@ -62,43 +63,27 @@ pub struct TransformBlockWriter { input_data_size: usize, input_num_rows: usize, - dal: Operator, - // Only used in multi table insert - table_id: Option, - - max_block_rows: usize, - input_data: VecDeque, + input_data: VecDeque<(usize, DataBlock)>, output_data: Option, } -impl TransformBlockWriter { +impl TransformBlockBuilder { pub fn try_create( - ctx: Arc, input: Arc, output: Arc, - table: &FuseTable, - table_meta_timestamps: TableMetaTimestamps, - with_tid: bool, + properties: Arc, ) -> Result { - let max_block_rows = std::cmp::min( - ctx.get_settings().get_max_block_size()? as usize, - table.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_BLOCK_ROW_COUNT), - ); - let properties = StreamBlockProperties::try_create(ctx, table, table_meta_timestamps)?; - Ok(ProcessorPtr::create(Box::new(TransformBlockWriter { + Ok(ProcessorPtr::create(Box::new(TransformBlockBuilder { state: State::Consume, input, output, properties, builder: None, - dal: table.get_operator(), need_flush: false, - table_id: if with_tid { Some(table.get_id()) } else { None }, input_data: VecDeque::new(), input_data_size: 0, input_num_rows: 0, output_data: None, - max_block_rows, }))) } @@ -111,23 +96,24 @@ impl TransformBlockWriter { Ok(self.builder.as_mut().unwrap()) } - fn calc_max_block_rows(&self, block: &DataBlock) -> usize { - let min_bytes_per_block = self.properties.block_thresholds.min_bytes_per_block; - let block_size = block.estimate_block_size(); - if block_size < min_bytes_per_block { - return self.max_block_rows; - } - let num_rows = block.num_rows(); + fn split_input(&self, input: DataBlock) -> Vec { + let block_size = input.estimate_block_size(); + let num_rows = input.num_rows(); let average_row_size = block_size.div_ceil(num_rows); - let max_rows = min_bytes_per_block.div_ceil(average_row_size); - self.max_block_rows.min(max_rows) + let max_rows = self + .properties + .block_thresholds + .min_bytes_per_block + .div_ceil(average_row_size) + .min(self.properties.block_thresholds.max_rows_per_block); + input.split_by_rows_no_tail(max_rows) } } #[async_trait] -impl Processor for TransformBlockWriter { +impl Processor for TransformBlockBuilder { fn name(&self) -> String { - "TransformBlockWriter".to_string() + "TransformBlockBuilder".to_string() } fn as_any(&mut self) -> &mut dyn Any { @@ -135,15 +121,15 @@ impl Processor for TransformBlockWriter { } fn event(&mut self) -> Result { - match &self.state { - State::Collect(_) | State::Serialize | State::Flush | State::Finalize => { - return Ok(Event::Sync) - } - State::Write(_) => return Ok(Event::Async), - _ => {} + if matches!( + self.state, + State::Collect(_) | State::Serialize | State::Flush | State::Finalize + ) { + return Ok(Event::Sync); } if self.output.is_finished() { + self.input.finish(); return Ok(Event::Finished); } @@ -196,15 +182,16 @@ impl Processor for TransformBlockWriter { State::Collect(block) => { // Check if the datablock is valid, this is needed to ensure data is correct block.check_valid()?; - self.input_data_size += block.estimate_block_size(); self.input_num_rows += block.num_rows(); - let max_rows_per_block = self.calc_max_block_rows(&block); - let blocks = block.split_by_rows_no_tail(max_rows_per_block); - self.input_data.extend(blocks); + for block in self.split_input(block) { + let block_size = block.estimate_block_size(); + self.input_data_size += block_size; + self.input_data.push_back((block_size, block)); + } } State::Serialize => { - while let Some(b) = self.input_data.pop_front() { - self.input_data_size -= b.estimate_block_size(); + while let Some((block_size, b)) = self.input_data.pop_front() { + self.input_data_size -= block_size; self.input_num_rows -= b.num_rows(); let builder = self.get_or_create_builder()?; @@ -217,7 +204,7 @@ impl Processor for TransformBlockWriter { } } State::Finalize => { - while let Some(b) = self.input_data.pop_front() { + while let Some((_, b)) = self.input_data.pop_front() { let builder = self.get_or_create_builder()?; builder.write(b)?; } @@ -227,7 +214,7 @@ impl Processor for TransformBlockWriter { let builder = self.builder.take().unwrap(); if !builder.is_empty() { let serialized = builder.finish()?; - self.state = State::Write(serialized); + self.output_data = Some(DataBlock::empty_with_meta(Box::new(serialized))); } self.need_flush = false; } @@ -235,11 +222,41 @@ impl Processor for TransformBlockWriter { } Ok(()) } +} - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Consume) { - State::Write(serialized) => { +pub struct TransformBlockWriter { + kind: MutationKind, + dal: Operator, + ctx: Arc, + // Only used in multi table insert + table_id: Option, +} + +impl TransformBlockWriter { + pub fn create( + ctx: Arc, + kind: MutationKind, + table: &FuseTable, + with_tid: bool, + ) -> Self { + Self { + ctx, + dal: table.get_operator(), + table_id: if with_tid { Some(table.get_id()) } else { None }, + kind, + } + } +} + +#[async_trait::async_trait] +impl AsyncAccumulatingTransform for TransformBlockWriter { + const NAME: &'static str = "TransformBlockWriter"; + + async fn transform(&mut self, data: DataBlock) -> Result> { + debug_assert!(data.is_empty()); + + if let Some(ptr) = data.get_owned_meta() { + if let Some(serialized) = BlockSerialization::downcast_from(ptr) { let extended_block_meta = BlockWriter::write_down(&self.dal, serialized).await?; let bytes = if let Some(draft_virtual_block_meta) = @@ -251,32 +268,45 @@ impl Processor for TransformBlockWriter { extended_block_meta.block_meta.block_size as usize }; - self.properties - .ctx - .get_write_progress() - .incr(&ProgressValues { - rows: extended_block_meta.block_meta.row_count as usize, - bytes, - }); + self.ctx.get_write_progress().incr(&ProgressValues { + rows: extended_block_meta.block_meta.row_count as usize, + bytes, + }); // appending new data block if let Some(tid) = self.table_id { - self.properties.ctx.update_multi_table_insert_status( + self.ctx.update_multi_table_insert_status( tid, extended_block_meta.block_meta.row_count, ); } else { - self.properties.ctx.add_mutation_status(MutationStatus { + self.ctx.add_mutation_status(MutationStatus { insert_rows: extended_block_meta.block_meta.row_count, update_rows: 0, deleted_rows: 0, }); } - self.output_data = Some(DataBlock::empty_with_meta(Box::new(extended_block_meta))); + let output = if matches!(self.kind, MutationKind::Insert) { + DataBlock::empty_with_meta(Box::new(extended_block_meta)) + } else { + if matches!(self.kind, MutationKind::Recluster) { + metrics_inc_recluster_write_block_nums(); + } + + DataBlock::empty_with_meta(Box::new(MutationLogs { + entries: vec![MutationLogEntry::AppendBlock { + block_meta: Arc::new(extended_block_meta), + }], + })) + }; + + return Ok(Some(output)); } - _ => return Err(ErrorCode::Internal("It's a bug.")), } - Ok(()) + + Err(ErrorCode::Internal( + "Cannot downcast meta to BlockSerialization", + )) } } diff --git a/src/query/storages/fuse/src/statistics/column_statistic.rs b/src/query/storages/fuse/src/statistics/column_statistic.rs index 36737dd9e7a62..d02cf23a96883 100644 --- a/src/query/storages/fuse/src/statistics/column_statistic.rs +++ b/src/query/storages/fuse/src/statistics/column_statistic.rs @@ -83,37 +83,39 @@ pub fn gen_columns_statistics( let mut min = Scalar::Null; let mut max = Scalar::Null; - let (mins, _) = eval_aggr("min", vec![], &[col.clone().into()], rows, vec![])?; - let (maxs, _) = eval_aggr("max", vec![], &[col.clone().into()], rows, vec![])?; + if col.len() > 0 { + let (mins, _) = eval_aggr("min", vec![], &[col.clone().into()], rows, vec![])?; + let (maxs, _) = eval_aggr("max", vec![], &[col.clone().into()], rows, vec![])?; - if mins.len() > 0 { - min = if let Some(v) = mins.index(0) { - if let Some(v) = v.to_owned().trim_min() { - v + if mins.len() > 0 { + min = if let Some(v) = mins.index(0) { + if let Some(v) = v.to_owned().trim_min() { + v + } else { + continue; + } } else { continue; } - } else { - continue; } - } - if maxs.len() > 0 { - max = if let Some(v) = maxs.index(0) { - if let Some(v) = v.to_owned().trim_max() { - v + if maxs.len() > 0 { + max = if let Some(v) = maxs.index(0) { + if let Some(v) = v.to_owned().trim_max() { + v + } else { + continue; + } } else { continue; } - } else { - continue; } } let (is_all_null, bitmap) = col.validity(); let unset_bits = match (is_all_null, bitmap) { - (true, _) => rows, - (false, Some(bitmap)) => bitmap.null_count(), + (_, Some(bitmap)) => bitmap.null_count(), + (true, None) => rows, (false, None) => 0, }; diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0004_remote_insert_into_select.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0004_remote_insert_into_select.test index 5e399dc1b406f..5ad4e316896d0 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0004_remote_insert_into_select.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0004_remote_insert_into_select.test @@ -7,9 +7,6 @@ CREATE DATABASE db_09_004 statement ok USE db_09_004 -statement ok -set enable_block_stream_write = 1 - statement ok CREATE TABLE IF NOT EXISTS t1(a UInt8 not null, b UInt64 not null, c Int8 not null, d Int64 not null, e Date not null, f Date not null, g DateTime not null, h String not null) Engine = Fuse diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test index 43a2b262ca2f9..c19a27a9e8890 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test @@ -7,9 +7,6 @@ CREATE DATABASE db_09_0008 statement ok USE db_09_0008 -statement ok -set enable_block_stream_write = 1 - statement ok create table t(a uint64 not null) diff --git a/tests/sqllogictests/suites/base/issues/issue_18275.test b/tests/sqllogictests/suites/base/issues/issue_18275.test index 36217cda4b3ca..ce895d228329e 100644 --- a/tests/sqllogictests/suites/base/issues/issue_18275.test +++ b/tests/sqllogictests/suites/base/issues/issue_18275.test @@ -14,9 +14,6 @@ CREATE OR REPLACE TABLE product_test ( stock INT ); -statement ok -set enable_block_stream_write = 1; - statement ok INSERT INTO product_test (id, name, category, price, stock) VALUES(6, 'Keyboard', 'Electronics', 79.99, 25), diff --git a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test index 0d86d3d55b737..cd693939daffa 100644 --- a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test +++ b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test @@ -613,9 +613,6 @@ S001 ST001 A Excellent Y S002 ST002 B Good Y S003 ST003 C Average N -statement ok -set enable_block_stream_write = 1 - statement ok CREATE OR REPLACE TABLE test_stream ( id INT, @@ -670,9 +667,6 @@ FROM test_stream; 9 "Richard" 33 "Austin" "hiking" "cycling" 10 "Lisa" 26 "Chicago" "gaming" "reading" -statement ok -set enable_block_stream_write = 0 - statement ok set enable_experimental_virtual_column = 0; diff --git a/tests/suites/1_stateful/05_formats/05_01_compact/05_01_02_load_compact_copy_row_per_block.sh b/tests/suites/1_stateful/05_formats/05_01_compact/05_01_02_load_compact_copy_row_per_block.sh index 4d6cafcb184f9..862680eab447d 100755 --- a/tests/suites/1_stateful/05_formats/05_01_compact/05_01_02_load_compact_copy_row_per_block.sh +++ b/tests/suites/1_stateful/05_formats/05_01_compact/05_01_02_load_compact_copy_row_per_block.sh @@ -15,7 +15,7 @@ echo "drop table if exists t1 all" | $BENDSQL_CLIENT_CONNECT echo "CREATE TABLE t1 ( c0 string -) engine=fuse row_per_block=800; +) engine=fuse row_per_block=500; " | $BENDSQL_CLIENT_CONNECT