From e2aa6cf8d01d68cae8e584e2946f62a981eccd8f Mon Sep 17 00:00:00 2001 From: dantengsky Date: Sun, 28 Apr 2024 10:17:30 +0800 Subject: [PATCH 1/3] add runtime bloom filter pruning --- .../base/src/runtime/profile/profiles.rs | 18 ++++-- .../hash_join/hash_join_build_state.rs | 25 +++++---- .../processors/transforms/hash_join/util.rs | 47 ++++++++++++++++ .../service/src/test_kits/block_writer.rs | 2 +- .../it/storages/fuse/operations/read_plan.rs | 21 ++++++- src/query/sql/src/planner/bloom_index.rs | 6 +- src/query/storages/fuse/src/fuse_part.rs | 11 ++++ .../read/agg_index/agg_index_reader_native.rs | 4 ++ .../agg_index/agg_index_reader_parquet.rs | 4 ++ .../virtual_column_reader_parquet.rs | 4 ++ .../processors/transform_serialize_block.rs | 2 +- .../storages/fuse/src/operations/merge.rs | 2 +- .../mutation/mutator/recluster_mutator.rs | 11 +++- .../read/native_data_source_reader.rs | 19 ++++++- .../operations/read/native_rows_fetcher.rs | 3 + .../read/parquet_data_source_reader.rs | 19 ++++++- .../operations/read/parquet_rows_fetcher.rs | 2 + .../operations/read/runtime_filter_prunner.rs | 55 ++++++++++++++++++- .../fuse/src/operations/read_partitions.rs | 54 ++++++++++++++++-- .../storages/fuse/src/pruning/bloom_pruner.rs | 4 +- .../storages/fuse/src/pruning/fuse_pruner.rs | 2 +- ...17_runtime_probe_side_bloom_pruning.result | 7 +++ ...0_0017_runtime_probe_side_bloom_pruning.sh | 31 +++++++++++ 23 files changed, 309 insertions(+), 44 deletions(-) create mode 100644 tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.result create mode 100755 tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.sh diff --git a/src/common/base/src/runtime/profile/profiles.rs b/src/common/base/src/runtime/profile/profiles.rs index 2aa2a7d2db850..fbf93e5145880 100644 --- a/src/common/base/src/runtime/profile/profiles.rs +++ b/src/common/base/src/runtime/profile/profiles.rs @@ -43,7 +43,8 @@ pub enum ProfileStatisticsName { SpillReadCount, SpillReadBytes, SpillReadTime, - RuntimeFilterPruneParts, + RuntimeRangeFilterPrunedParts, + RuntimeBloomFilterPrunedParts, MemoryUsage, } @@ -229,10 +230,17 @@ pub fn get_statistics_desc() -> Arc unit: StatisticsUnit::MillisSeconds, plain_statistics: true, }), - (ProfileStatisticsName::RuntimeFilterPruneParts, ProfileDesc { - display_name: "parts pruned by runtime filter", - desc: "The partitions pruned by runtime filter", - index: ProfileStatisticsName::RuntimeFilterPruneParts as usize, + (ProfileStatisticsName::RuntimeRangeFilterPrunedParts, ProfileDesc { + display_name: "parts pruned by runtime range filter", + desc: "The partitions pruned by runtime range filter", + index: ProfileStatisticsName::RuntimeRangeFilterPrunedParts as usize, + unit: StatisticsUnit::Count, + plain_statistics: true, + }), + (ProfileStatisticsName::RuntimeBloomFilterPrunedParts, ProfileDesc { + display_name: "parts pruned by runtime bloom filter", + desc: "The partitions pruned by runtime bloom filter", + index: ProfileStatisticsName::RuntimeBloomFilterPrunedParts as usize, unit: StatisticsUnit::Count, plain_statistics: true, }), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index e063c9a86578e..5cbaa5e0c8753 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -962,68 +962,69 @@ impl HashJoinBuildState { } // Generate min max filter using build column let min_max = build_key_column.remove_nullable().domain(); + let min_max_filter = match min_max { Domain::Number(domain) => match domain { NumberDomain::UInt8(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::UInt16(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::UInt32(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::UInt64(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::Int8(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::Int16(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::Int32(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::Int64(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::Float32(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } NumberDomain::Float64(simple_domain) => { let min = Scalar::Number(NumberScalar::from(simple_domain.min)); let max = Scalar::Number(NumberScalar::from(simple_domain.max)); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } }, Domain::String(domain) => { let min = Scalar::String(domain.min); let max = Scalar::String(domain.max.unwrap()); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } Domain::Date(date_domain) => { let min = Scalar::Date(date_domain.min); let max = Scalar::Date(date_domain.max); - min_max_filter(min, max, probe_key)? + min_max_filter(&self.func_ctx, min, max, probe_key)? } _ => unreachable!(), }; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs index 2b907419e4459..aa6774d020ef5 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs @@ -17,6 +17,7 @@ use databend_common_expression::type_check; use databend_common_expression::types::AnyType; use databend_common_expression::types::DataType; use databend_common_expression::Column; +use databend_common_expression::ConstantFolder; use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; @@ -233,10 +234,17 @@ where // Generate min max runtime filter pub(crate) fn min_max_filter( + func_ctx: &FunctionContext, min: Scalar, max: Scalar, probe_key: &Expr, ) -> Result>> { + if min == max { + // if min equals max, return a `eq` expression + // which can be used by both range filter and bloom filter + return eq_filter(func_ctx, min, probe_key); + } + if let Expr::ColumnRef { span, id, @@ -283,3 +291,42 @@ pub(crate) fn min_max_filter( } Ok(None) } + +fn eq_filter( + func_ctx: &FunctionContext, + scalar: Scalar, + probe_key: &Expr, +) -> Result>> { + if let Expr::ColumnRef { + span, + id, + data_type, + display_name, + } = probe_key + { + let raw_probe_key = RawExpr::ColumnRef { + span: *span, + id: id.to_string(), + data_type: data_type.clone(), + display_name: display_name.clone(), + }; + + let min = RawExpr::Constant { span: None, scalar }; + let eq_func = RawExpr::FunctionCall { + span: None, + name: "eq".to_string(), + params: vec![], + args: vec![raw_probe_key.clone(), min], + }; + let expr = type_check::check(&eq_func, &BUILTIN_FUNCTIONS)?; + + // Fold + // `Cast { expr: Constant { scalar: .., data_type: T }, dest_type: Nullable(T) }` + // to + // `Constant { scalar: .., data_type: Nullable(T) }` + // so that the expression can be utilized by bloom filter + let (expr, _) = ConstantFolder::fold(&expr, func_ctx, &BUILTIN_FUNCTIONS); + return Ok(Some(expr)); + } + Ok(None) +} diff --git a/src/query/service/src/test_kits/block_writer.rs b/src/query/service/src/test_kits/block_writer.rs index 2654c246f5c25..81f8a1df10216 100644 --- a/src/query/service/src/test_kits/block_writer.rs +++ b/src/query/service/src/test_kits/block_writer.rs @@ -109,7 +109,7 @@ impl<'a> BlockWriter<'a> { let bloom_index_cols = BloomIndexColumns::All; let bloom_columns_map = - bloom_index_cols.bloom_index_fields(schema.clone(), BloomIndex::supported_type)?; + bloom_index_cols.bloom_index_fields(schema.as_ref(), BloomIndex::supported_type)?; let maybe_bloom_index = BloomIndex::try_create( FunctionContext::default(), location.1, diff --git a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs index 53f75b3801db3..cc9b731d9f55f 100644 --- a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs +++ b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs @@ -112,7 +112,15 @@ fn test_to_partitions() -> Result<()> { let column_nodes = ColumnNodes { column_nodes }; // CASE I: no projection - let (s, parts) = FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, None); + let bloom_index_cols = None; + let (s, parts) = FuseTable::to_partitions( + None, + &blocks_metas, + &column_nodes, + None, + None, + bloom_index_cols, + ); assert_eq!(parts.len(), num_of_block as usize); let expected_block_size: u64 = cols_metas .values() @@ -140,8 +148,15 @@ fn test_to_partitions() -> Result<()> { ..Default::default() }); - let (stats, parts) = - FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, push_down); + let bloom_index_cols = None; + let (stats, parts) = FuseTable::to_partitions( + None, + &blocks_metas, + &column_nodes, + None, + push_down, + bloom_index_cols, + ); assert_eq!(parts.len(), num_of_block as usize); assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64); diff --git a/src/query/sql/src/planner/bloom_index.rs b/src/query/sql/src/planner/bloom_index.rs index b728e534c65fa..3d4289bc42475 100644 --- a/src/query/sql/src/planner/bloom_index.rs +++ b/src/query/sql/src/planner/bloom_index.rs @@ -20,7 +20,7 @@ use databend_common_ast::parser::tokenize_sql; use databend_common_ast::parser::Dialect; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::is_stream_column_id; +use databend_common_expression::{is_stream_column_id, TableSchema}; use databend_common_expression::ComputedExpr; use databend_common_expression::FieldIndex; use databend_common_expression::TableDataType; @@ -32,7 +32,7 @@ use databend_common_settings::Settings; use crate::normalize_identifier; use crate::planner::semantic::NameResolutionContext; -#[derive(Clone)] +#[derive(Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)] pub enum BloomIndexColumns { /// Default, all columns that support bloom index. All, @@ -111,7 +111,7 @@ impl BloomIndexColumns { /// Get table field based on the BloomIndexColumns and schema. pub fn bloom_index_fields( &self, - schema: TableSchemaRef, + schema: &TableSchema, verify_type: F, ) -> Result> where diff --git a/src/query/storages/fuse/src/fuse_part.rs b/src/query/storages/fuse/src/fuse_part.rs index 702807ae63f13..79b70c699d914 100644 --- a/src/query/storages/fuse/src/fuse_part.rs +++ b/src/query/storages/fuse/src/fuse_part.rs @@ -29,12 +29,19 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::ColumnId; use databend_common_expression::Scalar; +use databend_common_sql::BloomIndexColumns; use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::ColumnStatistics; use databend_storages_common_table_meta::meta::Compression; use databend_storages_common_table_meta::meta::Location; +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)] +pub struct BloomIndexDescriptor { + pub bloom_index_location: Option, + pub bloom_index_size: u64, + pub bloom_index_cols: BloomIndexColumns, +} /// Fuse table partition information. #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)] pub struct FuseBlockPartInfo { @@ -48,6 +55,8 @@ pub struct FuseBlockPartInfo { pub sort_min_max: Option<(Scalar, Scalar)>, pub block_meta_index: Option, + + pub bloom_index_descriptor: Option, } #[typetag::serde(name = "fuse")] @@ -84,6 +93,7 @@ impl FuseBlockPartInfo { sort_min_max: Option<(Scalar, Scalar)>, block_meta_index: Option, create_on: Option>, + bloom_index_descriptor: Option, ) -> Arc> { Arc::new(Box::new(FuseBlockPartInfo { location, @@ -94,6 +104,7 @@ impl FuseBlockPartInfo { sort_min_max, block_meta_index, columns_stat, + bloom_index_descriptor, })) } diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs index 986287b3fc87d..e5a3b0482c60e 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs @@ -46,6 +46,7 @@ impl AggIndexReader { .enumerate() .map(|(i, c)| (i as u32, ColumnMeta::Native(c))) .collect(); + let bloom_index_cols = None; let part = FuseBlockPartInfo::create( loc.to_string(), num_rows, @@ -55,6 +56,7 @@ impl AggIndexReader { None, None, None, + bloom_index_cols, ); let res = self .reader @@ -98,6 +100,7 @@ impl AggIndexReader { .enumerate() .map(|(i, c)| (i as u32, ColumnMeta::Native(c))) .collect(); + let bloom_index_cols = None; let part = FuseBlockPartInfo::create( loc.to_string(), num_rows, @@ -107,6 +110,7 @@ impl AggIndexReader { None, None, None, + bloom_index_cols, ); let res = self .reader diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs index f47d148a9b723..70895168a955e 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs @@ -40,6 +40,7 @@ impl AggIndexReader { debug_assert_eq!(metadata.row_groups.len(), 1); let row_group = &metadata.row_groups[0]; let columns_meta = build_columns_meta(row_group); + let bloom_index_cols = None; let part = FuseBlockPartInfo::create( loc.to_string(), row_group.num_rows() as u64, @@ -49,6 +50,7 @@ impl AggIndexReader { None, None, None, + bloom_index_cols, ); let res = self .reader @@ -90,6 +92,7 @@ impl AggIndexReader { .await .inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}")) .ok()?; + let bloom_index_cols = None; let part = FuseBlockPartInfo::create( loc.to_string(), row_group.num_rows() as u64, @@ -99,6 +102,7 @@ impl AggIndexReader { None, None, None, + bloom_index_cols, ); Some((part, res)) } diff --git a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs index f23e56e863171..210447d286967 100644 --- a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs @@ -82,6 +82,7 @@ impl VirtualColumnReader { let (ranges, ignore_column_ids) = self.read_columns_meta(&schema, &columns_meta); if !ranges.is_empty() { + let bloom_index_cols = None; let part = FuseBlockPartInfo::create( loc.to_string(), row_group.num_rows() as u64, @@ -91,6 +92,7 @@ impl VirtualColumnReader { None, None, None, + bloom_index_cols, ); let merge_io_result = @@ -124,6 +126,7 @@ impl VirtualColumnReader { let (ranges, ignore_column_ids) = self.read_columns_meta(&schema, &columns_meta); if !ranges.is_empty() { + let bloom_index_cols = None; let part = FuseBlockPartInfo::create( loc.to_string(), row_group.num_rows() as u64, @@ -133,6 +136,7 @@ impl VirtualColumnReader { None, None, None, + bloom_index_cols, ); let merge_io_result = BlockReader::merge_io_read( diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs index 838cb82a29484..a8eb846bf938a 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs @@ -126,7 +126,7 @@ impl TransformSerializeBlock { let bloom_columns_map = table .bloom_index_cols - .bloom_index_fields(source_schema.clone(), BloomIndex::supported_type)?; + .bloom_index_fields(source_schema.as_ref(), BloomIndex::supported_type)?; let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta); diff --git a/src/query/storages/fuse/src/operations/merge.rs b/src/query/storages/fuse/src/operations/merge.rs index 4fee803460c74..07539ab1436fa 100644 --- a/src/query/storages/fuse/src/operations/merge.rs +++ b/src/query/storages/fuse/src/operations/merge.rs @@ -89,7 +89,7 @@ impl FuseTable { .into(); let bloom_columns_map = self .bloom_index_cols() - .bloom_index_fields(new_schema.clone(), BloomIndex::supported_type)?; + .bloom_index_fields(new_schema.as_ref(), BloomIndex::supported_type)?; let inverted_index_builders = create_inverted_index_builders(&self.table_info.meta); let block_builder = BlockBuilder { diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 721e4136506f4..aced10829c42d 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -253,8 +253,15 @@ impl ReclusterMutator { total_bytes: usize, level: i32, ) { - let (stats, parts) = - FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None); + let bloom_index_cols = None; + let (stats, parts) = FuseTable::to_partitions( + Some(&self.schema), + block_metas, + column_nodes, + None, + None, + bloom_index_cols, + ); let task = ReclusterTask { parts, stats, diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs index 372580b97415a..b0b319079d4b2 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs @@ -38,7 +38,8 @@ use crate::io::BlockReader; use crate::io::TableMetaLocationGenerator; use crate::io::VirtualColumnReader; use crate::operations::read::data_source_with_meta::DataSourceWithMeta; -use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; +use crate::operations::read::runtime_filter_prunner::runtime_bloom_filter_prune; +use crate::operations::read::runtime_filter_prunner::runtime_range_filter_prune; use crate::FuseBlockPartInfo; pub struct ReadNativeDataSource { @@ -139,7 +140,7 @@ impl SyncSource for ReadNativeDataSource { .ctx .get_min_max_runtime_filter_with_id(self.table_index), ); - if runtime_filter_pruner( + if runtime_range_filter_prune( self.table_schema.clone(), &part, &filters, @@ -246,7 +247,7 @@ impl Processor for ReadNativeDataSource { ); let mut native_part_infos = Vec::with_capacity(parts.len()); for part in parts.into_iter() { - if runtime_filter_pruner( + if runtime_range_filter_prune( self.table_schema.clone(), &part, &filters, @@ -255,6 +256,18 @@ impl Processor for ReadNativeDataSource { continue; } + if runtime_bloom_filter_prune( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + &self.block_reader.operator, + ) + .await? + { + continue; + } + native_part_infos.push(part.clone()); let block_reader = self.block_reader.clone(); let index_reader = self.index_reader.clone(); diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs index a2eebac1ef03e..5c9b8cab99b0d 100644 --- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs @@ -223,12 +223,15 @@ impl NativeRowsFetcher { let block_idx = block_idx_in_segment(blocks.len(), block as usize); let block_meta = &blocks[block_idx]; let page_size = block_meta.page_size(); + + let bloom_index_cols = None; let part_info = FuseTable::projection_part( block_meta, &None, &column_nodes, None, &self.projection, + bloom_index_cols, ); self.part_map.insert(prefix, (part_info, page_size)); } diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs index 96f39ece964b0..8aff40960f429 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs @@ -40,7 +40,8 @@ use crate::io::ReadSettings; use crate::io::TableMetaLocationGenerator; use crate::io::VirtualColumnReader; use crate::operations::read::data_source_with_meta::DataSourceWithMeta; -use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; +use crate::operations::read::runtime_filter_prunner::runtime_bloom_filter_prune; +use crate::operations::read::runtime_filter_prunner::runtime_range_filter_prune; pub struct ReadParquetDataSource { func_ctx: FunctionContext, @@ -127,7 +128,7 @@ impl SyncSource for ReadParquetDataSource { .ctx .get_min_max_runtime_filter_with_id(self.table_index), ); - if runtime_filter_pruner( + if runtime_range_filter_prune( self.table_schema.clone(), &part, &filters, @@ -243,7 +244,7 @@ impl Processor for ReadParquetDataSource { ); let mut fuse_part_infos = Vec::with_capacity(parts.len()); for part in parts.into_iter() { - if runtime_filter_pruner( + if runtime_range_filter_prune( self.table_schema.clone(), &part, &filters, @@ -252,6 +253,18 @@ impl Processor for ReadParquetDataSource { continue; } + if runtime_bloom_filter_prune( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + &self.block_reader.operator, + ) + .await? + { + continue; + } + fuse_part_infos.push(part.clone()); let block_reader = self.block_reader.clone(); let settings = ReadSettings::from_ctx(&self.partitions.ctx)?; diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 35159c3726246..eb73ff4dfefaa 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -203,12 +203,14 @@ impl ParquetRowsFetcher { let blocks = self.segment_blocks_cache.get(&segment).unwrap(); let block_idx = block_idx_in_segment(blocks.len(), block as usize); let block_meta = &blocks[block_idx]; + let bloom_index_cols = None; let part_info = FuseTable::projection_part( block_meta, &None, &column_nodes, None, &self.projection, + bloom_index_cols, ); self.part_map.insert(prefix, part_info); diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs index acb2c721af908..a14dc224ecbcf 100644 --- a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs +++ b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs @@ -37,12 +37,14 @@ use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_hashtable::FastHash; use databend_storages_common_index::statistics_to_domain; use log::info; +use opendal::Operator; use xorf::BinaryFuse16; use xorf::Filter; +use crate::pruning::BloomPrunerCreator; use crate::FuseBlockPartInfo; -pub fn runtime_filter_pruner( +pub fn runtime_range_filter_prune( table_schema: Arc, part: &PartInfoPtr, filters: &[Expr], @@ -90,10 +92,57 @@ pub fn runtime_filter_pruner( if pruned { info!( - "Pruned partition with {:?} rows by runtime filter", + "Pruned partition with {:?} rows by runtime range filter", part.nums_rows ); - Profile::record_usize_profile(ProfileStatisticsName::RuntimeFilterPruneParts, 1); + Profile::record_usize_profile(ProfileStatisticsName::RuntimeRangeFilterPrunedParts, 1); + } + + Ok(pruned) +} + +pub async fn runtime_bloom_filter_prune( + table_schema: Arc, + part: &PartInfoPtr, + filters: &[Expr], + func_ctx: &FunctionContext, + dal: &Operator, +) -> Result { + let part = FuseBlockPartInfo::from_part(part)?; + + let mut pruned = false; + + if let Some(bloom_desc) = &part.bloom_index_descriptor { + let index_location = bloom_desc.bloom_index_location.clone(); + let index_size = bloom_desc.bloom_index_size; + let column_ids = part.columns_meta.keys().cloned().collect::>(); + let bloom_index_cols = &bloom_desc.bloom_index_cols; + + for filter_expr in filters { + if let Some(bloom_pruner) = BloomPrunerCreator::create( + func_ctx.clone(), + &table_schema, + dal.clone(), + Some(filter_expr), + bloom_index_cols, + )? { + let should_keep = bloom_pruner + .should_keep(&index_location, index_size, column_ids.clone()) + .await; + if !should_keep { + pruned = true; + break; + } + } + } + } + + if pruned { + info!( + "Pruned partition with {:?} rows by runtime bloom filter", + part.nums_rows + ); + Profile::record_usize_profile(ProfileStatisticsName::RuntimeBloomFilterPrunedParts, 1); } Ok(pruned) diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index a06fbe8923426..2510d1fa4282e 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -30,6 +30,7 @@ use databend_common_exception::Result; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; use databend_common_sql::field_default_value; +use databend_common_sql::BloomIndexColumns; use databend_common_storage::ColumnNodes; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache_manager::CachedObject; @@ -43,6 +44,7 @@ use opendal::Operator; use sha2::Digest; use sha2::Sha256; +use crate::fuse_part::BloomIndexDescriptor; use crate::fuse_part::FuseBlockPartInfo; use crate::pruning::create_segment_location_vector; use crate::pruning::FusePruner; @@ -243,8 +245,14 @@ impl FuseTable { .map(|topk| field_default_value(ctx.clone(), &topk.field).map(|d| (topk, d))) .transpose()?; - let (mut statistics, parts) = - Self::to_partitions(Some(&schema), block_metas, &column_nodes, top_k, push_downs); + let (mut statistics, parts) = Self::to_partitions( + Some(&schema), + block_metas, + &column_nodes, + top_k, + push_downs, + Some(self.bloom_index_cols.clone()), + ); // Update planner statistics. statistics.partitions_total = partitions_total; @@ -266,6 +274,7 @@ impl FuseTable { column_nodes: &ColumnNodes, top_k: Option<(TopK, Scalar)>, push_downs: Option, + bloom_index_cols: Option, ) -> (PartStatistics, Partitions) { let limit = push_downs .as_ref() @@ -311,15 +320,28 @@ impl FuseTable { } let (mut statistics, mut partitions) = match &push_downs { - None => Self::all_columns_partitions(schema, &block_metas, top_k.clone(), limit), + None => Self::all_columns_partitions( + schema, + &block_metas, + top_k.clone(), + limit, + bloom_index_cols, + ), Some(extras) => match &extras.projection { - None => Self::all_columns_partitions(schema, &block_metas, top_k.clone(), limit), + None => Self::all_columns_partitions( + schema, + &block_metas, + top_k.clone(), + limit, + bloom_index_cols, + ), Some(projection) => Self::projection_partitions( &block_metas, column_nodes, projection, top_k.clone(), limit, + bloom_index_cols, ), }, }; @@ -343,6 +365,7 @@ impl FuseTable { block_metas: &[(Option, Arc)], top_k: Option<(TopK, Scalar)>, limit: usize, + bloom_index_cols: Option, ) -> (PartStatistics, Partitions) { let mut statistics = PartStatistics::default_exact(); let mut partitions = Partitions::create(PartitionsShuffleKind::Mod, vec![]); @@ -359,6 +382,7 @@ impl FuseTable { block_meta_index, &top_k, block_meta, + bloom_index_cols.clone(), )); statistics.read_rows += rows; statistics.read_bytes += block_meta.block_size as usize; @@ -383,6 +407,7 @@ impl FuseTable { projection: &Projection, top_k: Option<(TopK, Scalar)>, limit: usize, + bloom_index_cols: Option, ) -> (PartStatistics, Partitions) { let mut statistics = PartStatistics::default_exact(); let mut partitions = Partitions::default(); @@ -401,6 +426,7 @@ impl FuseTable { column_nodes, top_k.clone(), projection, + bloom_index_cols.clone(), )); let rows = block_meta.row_count as usize; @@ -434,6 +460,7 @@ impl FuseTable { block_meta_index: &Option, top_k: &Option<(TopK, Scalar)>, meta: &BlockMeta, + bloom_index_cols: Option, ) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(meta.col_metas.len()); let mut columns_stats = HashMap::with_capacity(meta.col_stats.len()); @@ -467,6 +494,8 @@ impl FuseTable { .unwrap_or((default.clone(), default.clone())) }); + let bloom_index_descriptor = Self::build_bloom_index_descriptor(meta, bloom_index_cols); + FuseBlockPartInfo::create( location, rows_count, @@ -476,6 +505,7 @@ impl FuseTable { sort_min_max, block_meta_index.to_owned(), create_on, + bloom_index_descriptor, ) } @@ -485,6 +515,7 @@ impl FuseTable { column_nodes: &ColumnNodes, top_k: Option<(TopK, Scalar)>, projection: &Projection, + bloom_index_cols: Option, ) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(projection.len()); let mut columns_stat = HashMap::with_capacity(projection.len()); @@ -515,6 +546,9 @@ impl FuseTable { // TODO // row_count should be a hint value of LIMIT, // not the count the rows in this partition + + let bloom_descriptor = Self::build_bloom_index_descriptor(meta, bloom_index_cols); + FuseBlockPartInfo::create( location, rows_count, @@ -524,6 +558,18 @@ impl FuseTable { sort_min_max, block_meta_index.to_owned(), create_on, + bloom_descriptor, ) } + + fn build_bloom_index_descriptor( + block_meta: &BlockMeta, + bloom_index_cols_opt: Option, + ) -> Option { + bloom_index_cols_opt.map(|v| BloomIndexDescriptor { + bloom_index_location: block_meta.bloom_filter_index_location.clone(), + bloom_index_size: block_meta.bloom_filter_index_size, + bloom_index_cols: v, + }) + } } diff --git a/src/query/storages/fuse/src/pruning/bloom_pruner.rs b/src/query/storages/fuse/src/pruning/bloom_pruner.rs index 2cc20a229ec1e..dca68439dc60a 100644 --- a/src/query/storages/fuse/src/pruning/bloom_pruner.rs +++ b/src/query/storages/fuse/src/pruning/bloom_pruner.rs @@ -69,11 +69,11 @@ impl BloomPrunerCreator { schema: &TableSchemaRef, dal: Operator, filter_expr: Option<&Expr>, - bloom_index_cols: BloomIndexColumns, + bloom_index_cols: &BloomIndexColumns, ) -> Result>> { if let Some(expr) = filter_expr { let bloom_columns_map = - bloom_index_cols.bloom_index_fields(schema.clone(), BloomIndex::supported_type)?; + bloom_index_cols.bloom_index_fields(schema.as_ref(), BloomIndex::supported_type)?; let bloom_column_fields = bloom_columns_map.values().cloned().collect::>(); let point_query_cols = BloomIndex::find_eq_columns(expr, bloom_column_fields)?; diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 786620214b071..d042cbc7d343d 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -130,7 +130,7 @@ impl PruningContext { &table_schema, dal.clone(), filter_expr.as_ref(), - bloom_index_cols, + &bloom_index_cols, )?; // Page pruner, used in native format diff --git a/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.result b/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.result new file mode 100644 index 0000000000000..af0379a35295e --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.result @@ -0,0 +1,7 @@ +The probe table should consist of 2 blocks +2 +runtime range filter should work, one of the blocks should be pruned by range filter + ├── parts pruned by runtime range filter: 1 +runtime bloom filter should work, another block should be pruned by bloom filter + ├── parts pruned by runtime range filter: 1 + ├── parts pruned by runtime bloom filter: 1 diff --git a/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.sh b/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.sh new file mode 100755 index 0000000000000..7b6b47423c94e --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +echo "create or replace database rt_bloom" | $BENDSQL_CLIENT_CONNECT + +echo "create table rt_bloom.probe(c uint64) as select * from numbers(100000)" | $BENDSQL_CLIENT_CONNECT +echo "create table rt_bloom.build(c uint64)" | $BENDSQL_CLIENT_CONNECT + +echo "The probe table should consist of 2 blocks" +echo "select block_count from fuse_snapshot('rt_bloom','probe')" | $BENDSQL_CLIENT_CONNECT + + +echo "insert into rt_bloom.build values(50)" | $BENDSQL_CLIENT_CONNECT + + +echo "runtime range filter should work, one of the blocks should be pruned by range filter" +echo "explain analyze select * from rt_bloom.probe inner join rt_bloom.build on probe.c = build.c " \ + | $BENDSQL_CLIENT_CONNECT | grep "parts pruned by" + + + +echo "delete from rt_bloom.probe where c = 50" | $BENDSQL_CLIENT_CONNECT; +echo "runtime bloom filter should work, another block should be pruned by bloom filter" +echo "explain analyze select * from rt_bloom.probe inner join rt_bloom.build on probe.c = build.c " \ + | $BENDSQL_CLIENT_CONNECT | grep "parts pruned by" + +echo "DROP TABLE rt_bloom.probe" | $BENDSQL_CLIENT_CONNECT +echo "DROP TABLE rt_bloom.build" | $BENDSQL_CLIENT_CONNECT +echo "drop database rt_bloom" | $BENDSQL_CLIENT_CONNECT From 5a4858ef520e3efe3f4666221a9447ff5c752aa6 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 6 May 2024 13:16:45 +0800 Subject: [PATCH 2/3] move test to stateful suite (which use non-blocking io) --- src/query/sql/src/planner/bloom_index.rs | 3 ++- .../02_query/02_0006_runtime_probe_side_bloom_pruning.result} | 0 .../02_query/02_0006_runtime_probe_side_bloom_pruning.sh} | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) rename tests/suites/{0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.result => 1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.result} (100%) rename tests/suites/{0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.sh => 1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.sh} (87%) diff --git a/src/query/sql/src/planner/bloom_index.rs b/src/query/sql/src/planner/bloom_index.rs index 3d4289bc42475..3b0bf556f54e7 100644 --- a/src/query/sql/src/planner/bloom_index.rs +++ b/src/query/sql/src/planner/bloom_index.rs @@ -20,11 +20,12 @@ use databend_common_ast::parser::tokenize_sql; use databend_common_ast::parser::Dialect; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::{is_stream_column_id, TableSchema}; +use databend_common_expression::is_stream_column_id; use databend_common_expression::ComputedExpr; use databend_common_expression::FieldIndex; use databend_common_expression::TableDataType; use databend_common_expression::TableField; +use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; use databend_common_meta_app::tenant::Tenant; use databend_common_settings::Settings; diff --git a/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.result b/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.result similarity index 100% rename from tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.result rename to tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.result diff --git a/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.sh b/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.sh similarity index 87% rename from tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.sh rename to tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.sh index 7b6b47423c94e..d57b1f8501c2f 100755 --- a/tests/suites/0_stateless/20+_others/20_0017_runtime_probe_side_bloom_pruning.sh +++ b/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.sh @@ -3,6 +3,10 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../../../shell_env.sh + +# As currently, runtime bloom pruning (probe side) only support non-blocking block reader +# this case must be tested with non-blocking storage type (minio / s3 etc, not local fs) + echo "create or replace database rt_bloom" | $BENDSQL_CLIENT_CONNECT echo "create table rt_bloom.probe(c uint64) as select * from numbers(100000)" | $BENDSQL_CLIENT_CONNECT From d956b8c4d322eb647a2a34fd2b87eebd70d36509 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 6 May 2024 13:45:08 +0800 Subject: [PATCH 3/3] tweak stateful test --- ...06_runtime_probe_side_bloom_pruning.result | 6 +++--- ...2_0006_runtime_probe_side_bloom_pruning.sh | 19 +++++++++++-------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.result b/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.result index af0379a35295e..0cc59b85a9ba2 100644 --- a/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.result +++ b/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.result @@ -1,7 +1,7 @@ The probe table should consist of 2 blocks 2 runtime range filter should work, one of the blocks should be pruned by range filter - ├── parts pruned by runtime range filter: 1 +├── parts pruned by runtime range filter: 1 runtime bloom filter should work, another block should be pruned by bloom filter - ├── parts pruned by runtime range filter: 1 - ├── parts pruned by runtime bloom filter: 1 +├── parts pruned by runtime range filter: 1 +├── parts pruned by runtime bloom filter: 1 diff --git a/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.sh b/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.sh index d57b1f8501c2f..a0105c7508b7b 100755 --- a/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.sh +++ b/tests/suites/1_stateful/02_query/02_0006_runtime_probe_side_bloom_pruning.sh @@ -9,26 +9,29 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) echo "create or replace database rt_bloom" | $BENDSQL_CLIENT_CONNECT -echo "create table rt_bloom.probe(c uint64) as select * from numbers(100000)" | $BENDSQL_CLIENT_CONNECT +echo "create table rt_bloom.probe(c uint64)" | $BENDSQL_CLIENT_CONNECT echo "create table rt_bloom.build(c uint64)" | $BENDSQL_CLIENT_CONNECT -echo "The probe table should consist of 2 blocks" -echo "select block_count from fuse_snapshot('rt_bloom','probe')" | $BENDSQL_CLIENT_CONNECT +echo "insert into rt_bloom.probe values (1),(2),(3)" | $BENDSQL_CLIENT_CONNECT +echo "insert into rt_bloom.probe values (4),(5),(6)" | $BENDSQL_CLIENT_CONNECT +echo "The probe table should consist of 2 blocks" +echo "select block_count from fuse_snapshot('rt_bloom','probe') limit 1" | $BENDSQL_CLIENT_CONNECT -echo "insert into rt_bloom.build values(50)" | $BENDSQL_CLIENT_CONNECT +echo "insert into rt_bloom.build values(5)" | $BENDSQL_CLIENT_CONNECT echo "runtime range filter should work, one of the blocks should be pruned by range filter" +# leading spaces are trimmed, as distributed plan has different indentation echo "explain analyze select * from rt_bloom.probe inner join rt_bloom.build on probe.c = build.c " \ - | $BENDSQL_CLIENT_CONNECT | grep "parts pruned by" - + | $BENDSQL_CLIENT_CONNECT | grep "parts pruned by" | awk '{$1=$1}1' -echo "delete from rt_bloom.probe where c = 50" | $BENDSQL_CLIENT_CONNECT; +echo "delete from rt_bloom.probe where c = 5" | $BENDSQL_CLIENT_CONNECT; echo "runtime bloom filter should work, another block should be pruned by bloom filter" +# leading spaces are trimmed, as distributed plan has different indentation echo "explain analyze select * from rt_bloom.probe inner join rt_bloom.build on probe.c = build.c " \ - | $BENDSQL_CLIENT_CONNECT | grep "parts pruned by" + | $BENDSQL_CLIENT_CONNECT | grep "parts pruned by" | awk '{$1=$1}1' echo "DROP TABLE rt_bloom.probe" | $BENDSQL_CLIENT_CONNECT echo "DROP TABLE rt_bloom.build" | $BENDSQL_CLIENT_CONNECT