Skip to content

Commit e2aa6cf

Browse files
committed
add runtime bloom filter pruning
1 parent 95eb6df commit e2aa6cf

File tree

23 files changed

+309
-44
lines changed

23 files changed

+309
-44
lines changed

src/common/base/src/runtime/profile/profiles.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ pub enum ProfileStatisticsName {
4343
SpillReadCount,
4444
SpillReadBytes,
4545
SpillReadTime,
46-
RuntimeFilterPruneParts,
46+
RuntimeRangeFilterPrunedParts,
47+
RuntimeBloomFilterPrunedParts,
4748
MemoryUsage,
4849
}
4950

@@ -229,10 +230,17 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
229230
unit: StatisticsUnit::MillisSeconds,
230231
plain_statistics: true,
231232
}),
232-
(ProfileStatisticsName::RuntimeFilterPruneParts, ProfileDesc {
233-
display_name: "parts pruned by runtime filter",
234-
desc: "The partitions pruned by runtime filter",
235-
index: ProfileStatisticsName::RuntimeFilterPruneParts as usize,
233+
(ProfileStatisticsName::RuntimeRangeFilterPrunedParts, ProfileDesc {
234+
display_name: "parts pruned by runtime range filter",
235+
desc: "The partitions pruned by runtime range filter",
236+
index: ProfileStatisticsName::RuntimeRangeFilterPrunedParts as usize,
237+
unit: StatisticsUnit::Count,
238+
plain_statistics: true,
239+
}),
240+
(ProfileStatisticsName::RuntimeBloomFilterPrunedParts, ProfileDesc {
241+
display_name: "parts pruned by runtime bloom filter",
242+
desc: "The partitions pruned by runtime bloom filter",
243+
index: ProfileStatisticsName::RuntimeBloomFilterPrunedParts as usize,
236244
unit: StatisticsUnit::Count,
237245
plain_statistics: true,
238246
}),

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -962,68 +962,69 @@ impl HashJoinBuildState {
962962
}
963963
// Generate min max filter using build column
964964
let min_max = build_key_column.remove_nullable().domain();
965+
965966
let min_max_filter = match min_max {
966967
Domain::Number(domain) => match domain {
967968
NumberDomain::UInt8(simple_domain) => {
968969
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
969970
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
970-
min_max_filter(min, max, probe_key)?
971+
min_max_filter(&self.func_ctx, min, max, probe_key)?
971972
}
972973
NumberDomain::UInt16(simple_domain) => {
973974
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
974975
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
975-
min_max_filter(min, max, probe_key)?
976+
min_max_filter(&self.func_ctx, min, max, probe_key)?
976977
}
977978
NumberDomain::UInt32(simple_domain) => {
978979
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
979980
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
980-
min_max_filter(min, max, probe_key)?
981+
min_max_filter(&self.func_ctx, min, max, probe_key)?
981982
}
982983
NumberDomain::UInt64(simple_domain) => {
983984
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
984985
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
985-
min_max_filter(min, max, probe_key)?
986+
min_max_filter(&self.func_ctx, min, max, probe_key)?
986987
}
987988
NumberDomain::Int8(simple_domain) => {
988989
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
989990
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
990-
min_max_filter(min, max, probe_key)?
991+
min_max_filter(&self.func_ctx, min, max, probe_key)?
991992
}
992993
NumberDomain::Int16(simple_domain) => {
993994
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
994995
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
995-
min_max_filter(min, max, probe_key)?
996+
min_max_filter(&self.func_ctx, min, max, probe_key)?
996997
}
997998
NumberDomain::Int32(simple_domain) => {
998999
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
9991000
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
1000-
min_max_filter(min, max, probe_key)?
1001+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10011002
}
10021003
NumberDomain::Int64(simple_domain) => {
10031004
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
10041005
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
1005-
min_max_filter(min, max, probe_key)?
1006+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10061007
}
10071008
NumberDomain::Float32(simple_domain) => {
10081009
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
10091010
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
1010-
min_max_filter(min, max, probe_key)?
1011+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10111012
}
10121013
NumberDomain::Float64(simple_domain) => {
10131014
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
10141015
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
1015-
min_max_filter(min, max, probe_key)?
1016+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10161017
}
10171018
},
10181019
Domain::String(domain) => {
10191020
let min = Scalar::String(domain.min);
10201021
let max = Scalar::String(domain.max.unwrap());
1021-
min_max_filter(min, max, probe_key)?
1022+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10221023
}
10231024
Domain::Date(date_domain) => {
10241025
let min = Scalar::Date(date_domain.min);
10251026
let max = Scalar::Date(date_domain.max);
1026-
min_max_filter(min, max, probe_key)?
1027+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10271028
}
10281029
_ => unreachable!(),
10291030
};

src/query/service/src/pipelines/processors/transforms/hash_join/util.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use databend_common_expression::type_check;
1717
use databend_common_expression::types::AnyType;
1818
use databend_common_expression::types::DataType;
1919
use databend_common_expression::Column;
20+
use databend_common_expression::ConstantFolder;
2021
use databend_common_expression::DataBlock;
2122
use databend_common_expression::DataField;
2223
use databend_common_expression::DataSchemaRef;
@@ -233,10 +234,17 @@ where
233234

234235
// Generate min max runtime filter
235236
pub(crate) fn min_max_filter(
237+
func_ctx: &FunctionContext,
236238
min: Scalar,
237239
max: Scalar,
238240
probe_key: &Expr<String>,
239241
) -> Result<Option<Expr<String>>> {
242+
if min == max {
243+
// if min equals max, return a `eq` expression
244+
// which can be used by both range filter and bloom filter
245+
return eq_filter(func_ctx, min, probe_key);
246+
}
247+
240248
if let Expr::ColumnRef {
241249
span,
242250
id,
@@ -283,3 +291,42 @@ pub(crate) fn min_max_filter(
283291
}
284292
Ok(None)
285293
}
294+
295+
fn eq_filter(
296+
func_ctx: &FunctionContext,
297+
scalar: Scalar,
298+
probe_key: &Expr<String>,
299+
) -> Result<Option<Expr<String>>> {
300+
if let Expr::ColumnRef {
301+
span,
302+
id,
303+
data_type,
304+
display_name,
305+
} = probe_key
306+
{
307+
let raw_probe_key = RawExpr::ColumnRef {
308+
span: *span,
309+
id: id.to_string(),
310+
data_type: data_type.clone(),
311+
display_name: display_name.clone(),
312+
};
313+
314+
let min = RawExpr::Constant { span: None, scalar };
315+
let eq_func = RawExpr::FunctionCall {
316+
span: None,
317+
name: "eq".to_string(),
318+
params: vec![],
319+
args: vec![raw_probe_key.clone(), min],
320+
};
321+
let expr = type_check::check(&eq_func, &BUILTIN_FUNCTIONS)?;
322+
323+
// Fold
324+
// `Cast { expr: Constant { scalar: .., data_type: T }, dest_type: Nullable(T) }`
325+
// to
326+
// `Constant { scalar: .., data_type: Nullable(T) }`
327+
// so that the expression can be utilized by bloom filter
328+
let (expr, _) = ConstantFolder::fold(&expr, func_ctx, &BUILTIN_FUNCTIONS);
329+
return Ok(Some(expr));
330+
}
331+
Ok(None)
332+
}

src/query/service/src/test_kits/block_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl<'a> BlockWriter<'a> {
109109

110110
let bloom_index_cols = BloomIndexColumns::All;
111111
let bloom_columns_map =
112-
bloom_index_cols.bloom_index_fields(schema.clone(), BloomIndex::supported_type)?;
112+
bloom_index_cols.bloom_index_fields(schema.as_ref(), BloomIndex::supported_type)?;
113113
let maybe_bloom_index = BloomIndex::try_create(
114114
FunctionContext::default(),
115115
location.1,

src/query/service/tests/it/storages/fuse/operations/read_plan.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,15 @@ fn test_to_partitions() -> Result<()> {
112112
let column_nodes = ColumnNodes { column_nodes };
113113

114114
// CASE I: no projection
115-
let (s, parts) = FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, None);
115+
let bloom_index_cols = None;
116+
let (s, parts) = FuseTable::to_partitions(
117+
None,
118+
&blocks_metas,
119+
&column_nodes,
120+
None,
121+
None,
122+
bloom_index_cols,
123+
);
116124
assert_eq!(parts.len(), num_of_block as usize);
117125
let expected_block_size: u64 = cols_metas
118126
.values()
@@ -140,8 +148,15 @@ fn test_to_partitions() -> Result<()> {
140148
..Default::default()
141149
});
142150

143-
let (stats, parts) =
144-
FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, push_down);
151+
let bloom_index_cols = None;
152+
let (stats, parts) = FuseTable::to_partitions(
153+
None,
154+
&blocks_metas,
155+
&column_nodes,
156+
None,
157+
push_down,
158+
bloom_index_cols,
159+
);
145160
assert_eq!(parts.len(), num_of_block as usize);
146161
assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64);
147162

src/query/sql/src/planner/bloom_index.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use databend_common_ast::parser::tokenize_sql;
2020
use databend_common_ast::parser::Dialect;
2121
use databend_common_exception::ErrorCode;
2222
use databend_common_exception::Result;
23-
use databend_common_expression::is_stream_column_id;
23+
use databend_common_expression::{is_stream_column_id, TableSchema};
2424
use databend_common_expression::ComputedExpr;
2525
use databend_common_expression::FieldIndex;
2626
use databend_common_expression::TableDataType;
@@ -32,7 +32,7 @@ use databend_common_settings::Settings;
3232
use crate::normalize_identifier;
3333
use crate::planner::semantic::NameResolutionContext;
3434

35-
#[derive(Clone)]
35+
#[derive(Clone, serde::Serialize, serde::Deserialize, PartialEq, Debug)]
3636
pub enum BloomIndexColumns {
3737
/// Default, all columns that support bloom index.
3838
All,
@@ -111,7 +111,7 @@ impl BloomIndexColumns {
111111
/// Get table field based on the BloomIndexColumns and schema.
112112
pub fn bloom_index_fields<F>(
113113
&self,
114-
schema: TableSchemaRef,
114+
schema: &TableSchema,
115115
verify_type: F,
116116
) -> Result<BTreeMap<FieldIndex, TableField>>
117117
where

src/query/storages/fuse/src/fuse_part.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,19 @@ use databend_common_exception::ErrorCode;
2929
use databend_common_exception::Result;
3030
use databend_common_expression::ColumnId;
3131
use databend_common_expression::Scalar;
32+
use databend_common_sql::BloomIndexColumns;
3233
use databend_storages_common_pruner::BlockMetaIndex;
3334
use databend_storages_common_table_meta::meta::ColumnMeta;
3435
use databend_storages_common_table_meta::meta::ColumnStatistics;
3536
use databend_storages_common_table_meta::meta::Compression;
3637
use databend_storages_common_table_meta::meta::Location;
3738

39+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)]
40+
pub struct BloomIndexDescriptor {
41+
pub bloom_index_location: Option<Location>,
42+
pub bloom_index_size: u64,
43+
pub bloom_index_cols: BloomIndexColumns,
44+
}
3845
/// Fuse table partition information.
3946
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)]
4047
pub struct FuseBlockPartInfo {
@@ -48,6 +55,8 @@ pub struct FuseBlockPartInfo {
4855

4956
pub sort_min_max: Option<(Scalar, Scalar)>,
5057
pub block_meta_index: Option<BlockMetaIndex>,
58+
59+
pub bloom_index_descriptor: Option<BloomIndexDescriptor>,
5160
}
5261

5362
#[typetag::serde(name = "fuse")]
@@ -84,6 +93,7 @@ impl FuseBlockPartInfo {
8493
sort_min_max: Option<(Scalar, Scalar)>,
8594
block_meta_index: Option<BlockMetaIndex>,
8695
create_on: Option<DateTime<Utc>>,
96+
bloom_index_descriptor: Option<BloomIndexDescriptor>,
8797
) -> Arc<Box<dyn PartInfo>> {
8898
Arc::new(Box::new(FuseBlockPartInfo {
8999
location,
@@ -94,6 +104,7 @@ impl FuseBlockPartInfo {
94104
sort_min_max,
95105
block_meta_index,
96106
columns_stat,
107+
bloom_index_descriptor,
97108
}))
98109
}
99110

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ impl AggIndexReader {
4646
.enumerate()
4747
.map(|(i, c)| (i as u32, ColumnMeta::Native(c)))
4848
.collect();
49+
let bloom_index_cols = None;
4950
let part = FuseBlockPartInfo::create(
5051
loc.to_string(),
5152
num_rows,
@@ -55,6 +56,7 @@ impl AggIndexReader {
5556
None,
5657
None,
5758
None,
59+
bloom_index_cols,
5860
);
5961
let res = self
6062
.reader
@@ -98,6 +100,7 @@ impl AggIndexReader {
98100
.enumerate()
99101
.map(|(i, c)| (i as u32, ColumnMeta::Native(c)))
100102
.collect();
103+
let bloom_index_cols = None;
101104
let part = FuseBlockPartInfo::create(
102105
loc.to_string(),
103106
num_rows,
@@ -107,6 +110,7 @@ impl AggIndexReader {
107110
None,
108111
None,
109112
None,
113+
bloom_index_cols,
110114
);
111115
let res = self
112116
.reader

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ impl AggIndexReader {
4040
debug_assert_eq!(metadata.row_groups.len(), 1);
4141
let row_group = &metadata.row_groups[0];
4242
let columns_meta = build_columns_meta(row_group);
43+
let bloom_index_cols = None;
4344
let part = FuseBlockPartInfo::create(
4445
loc.to_string(),
4546
row_group.num_rows() as u64,
@@ -49,6 +50,7 @@ impl AggIndexReader {
4950
None,
5051
None,
5152
None,
53+
bloom_index_cols,
5254
);
5355
let res = self
5456
.reader
@@ -90,6 +92,7 @@ impl AggIndexReader {
9092
.await
9193
.inspect_err(|e| debug!("Read aggregating index `{loc}` failed: {e}"))
9294
.ok()?;
95+
let bloom_index_cols = None;
9396
let part = FuseBlockPartInfo::create(
9497
loc.to_string(),
9598
row_group.num_rows() as u64,
@@ -99,6 +102,7 @@ impl AggIndexReader {
99102
None,
100103
None,
101104
None,
105+
bloom_index_cols,
102106
);
103107
Some((part, res))
104108
}

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ impl VirtualColumnReader {
8282
let (ranges, ignore_column_ids) = self.read_columns_meta(&schema, &columns_meta);
8383

8484
if !ranges.is_empty() {
85+
let bloom_index_cols = None;
8586
let part = FuseBlockPartInfo::create(
8687
loc.to_string(),
8788
row_group.num_rows() as u64,
@@ -91,6 +92,7 @@ impl VirtualColumnReader {
9192
None,
9293
None,
9394
None,
95+
bloom_index_cols,
9496
);
9597

9698
let merge_io_result =
@@ -124,6 +126,7 @@ impl VirtualColumnReader {
124126
let (ranges, ignore_column_ids) = self.read_columns_meta(&schema, &columns_meta);
125127

126128
if !ranges.is_empty() {
129+
let bloom_index_cols = None;
127130
let part = FuseBlockPartInfo::create(
128131
loc.to_string(),
129132
row_group.num_rows() as u64,
@@ -133,6 +136,7 @@ impl VirtualColumnReader {
133136
None,
134137
None,
135138
None,
139+
bloom_index_cols,
136140
);
137141

138142
let merge_io_result = BlockReader::merge_io_read(

0 commit comments

Comments
 (0)