Skip to content

Commit 8134df7

Browse files
committed
Tweak runtime filter expressions
- Convert zero range filters to equivalent (`eq`) expressions to make it compatable with both range filters and bloom filters. - Constant-folds the `eq` expressions to avoid issues where `Expr::cast` could prevent the bloom filter from functioning correctly.
1 parent 67e2885 commit 8134df7

File tree

6 files changed

+67
-31
lines changed

6 files changed

+67
-31
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -962,68 +962,69 @@ impl HashJoinBuildState {
962962
}
963963
// Generate min max filter using build column
964964
let min_max = build_key_column.remove_nullable().domain();
965+
965966
let min_max_filter = match min_max {
966967
Domain::Number(domain) => match domain {
967968
NumberDomain::UInt8(simple_domain) => {
968969
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
969970
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
970-
min_max_filter(min, max, probe_key)?
971+
min_max_filter(&self.func_ctx, min, max, probe_key)?
971972
}
972973
NumberDomain::UInt16(simple_domain) => {
973974
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
974975
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
975-
min_max_filter(min, max, probe_key)?
976+
min_max_filter(&self.func_ctx, min, max, probe_key)?
976977
}
977978
NumberDomain::UInt32(simple_domain) => {
978979
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
979980
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
980-
min_max_filter(min, max, probe_key)?
981+
min_max_filter(&self.func_ctx, min, max, probe_key)?
981982
}
982983
NumberDomain::UInt64(simple_domain) => {
983984
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
984985
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
985-
min_max_filter(min, max, probe_key)?
986+
min_max_filter(&self.func_ctx, min, max, probe_key)?
986987
}
987988
NumberDomain::Int8(simple_domain) => {
988989
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
989990
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
990-
min_max_filter(min, max, probe_key)?
991+
min_max_filter(&self.func_ctx, min, max, probe_key)?
991992
}
992993
NumberDomain::Int16(simple_domain) => {
993994
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
994995
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
995-
min_max_filter(min, max, probe_key)?
996+
min_max_filter(&self.func_ctx, min, max, probe_key)?
996997
}
997998
NumberDomain::Int32(simple_domain) => {
998999
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
9991000
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
1000-
min_max_filter(min, max, probe_key)?
1001+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10011002
}
10021003
NumberDomain::Int64(simple_domain) => {
10031004
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
10041005
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
1005-
min_max_filter(min, max, probe_key)?
1006+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10061007
}
10071008
NumberDomain::Float32(simple_domain) => {
10081009
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
10091010
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
1010-
min_max_filter(min, max, probe_key)?
1011+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10111012
}
10121013
NumberDomain::Float64(simple_domain) => {
10131014
let min = Scalar::Number(NumberScalar::from(simple_domain.min));
10141015
let max = Scalar::Number(NumberScalar::from(simple_domain.max));
1015-
min_max_filter(min, max, probe_key)?
1016+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10161017
}
10171018
},
10181019
Domain::String(domain) => {
10191020
let min = Scalar::String(domain.min);
10201021
let max = Scalar::String(domain.max.unwrap());
1021-
min_max_filter(min, max, probe_key)?
1022+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10221023
}
10231024
Domain::Date(date_domain) => {
10241025
let min = Scalar::Date(date_domain.min);
10251026
let max = Scalar::Date(date_domain.max);
1026-
min_max_filter(min, max, probe_key)?
1027+
min_max_filter(&self.func_ctx, min, max, probe_key)?
10271028
}
10281029
_ => unreachable!(),
10291030
};

src/query/service/src/pipelines/processors/transforms/hash_join/util.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use databend_common_expression::type_check;
1717
use databend_common_expression::types::AnyType;
1818
use databend_common_expression::types::DataType;
1919
use databend_common_expression::Column;
20+
use databend_common_expression::ConstantFolder;
2021
use databend_common_expression::DataBlock;
2122
use databend_common_expression::DataField;
2223
use databend_common_expression::DataSchemaRef;
@@ -233,10 +234,17 @@ where
233234

234235
// Generate min max runtime filter
235236
pub(crate) fn min_max_filter(
237+
func_ctx: &FunctionContext,
236238
min: Scalar,
237239
max: Scalar,
238240
probe_key: &Expr<String>,
239241
) -> Result<Option<Expr<String>>> {
242+
if min == max {
243+
// if min equals max, return a `eq` expression
244+
// which can be used by both range filter and bloom filter
245+
return eq_filter(func_ctx, min, probe_key);
246+
}
247+
240248
if let Expr::ColumnRef {
241249
span,
242250
id,
@@ -283,3 +291,43 @@ pub(crate) fn min_max_filter(
283291
}
284292
Ok(None)
285293
}
294+
295+
fn eq_filter(
296+
func_ctx: &FunctionContext,
297+
scalar: Scalar,
298+
probe_key: &Expr<String>,
299+
) -> Result<Option<Expr<String>>> {
300+
if let Expr::ColumnRef {
301+
span,
302+
id,
303+
data_type,
304+
display_name,
305+
} = probe_key
306+
{
307+
let raw_probe_key = RawExpr::ColumnRef {
308+
span: *span,
309+
id: id.to_string(),
310+
data_type: data_type.clone(),
311+
display_name: display_name.clone(),
312+
};
313+
314+
let min = RawExpr::Constant { span: None, scalar };
315+
// Make gte and lte function
316+
let eq_func = RawExpr::FunctionCall {
317+
span: None,
318+
name: "eq".to_string(),
319+
params: vec![],
320+
args: vec![raw_probe_key.clone(), min],
321+
};
322+
let expr = type_check::check(&eq_func, &BUILTIN_FUNCTIONS)?;
323+
324+
// Fold
325+
// `Cast { expr: Constant { scalar: Number(50_u64), data_type: T }, dest_type: Nullable(T) }`
326+
// to
327+
// `Constant { scalar: Number(50_u64), data_type: Nullable(T) }`
328+
// so that the expression can be utilized by bloom filter
329+
let (expr, _) = ConstantFolder::fold(&expr, func_ctx, &BUILTIN_FUNCTIONS);
330+
return Ok(Some(expr));
331+
}
332+
Ok(None)
333+
}

src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,6 @@ impl Processor for ReadParquetDataSource<false> {
244244
);
245245
let mut fuse_part_infos = Vec::with_capacity(parts.len());
246246
for part in parts.into_iter() {
247-
eprintln!("applying range filter");
248247
if runtime_filter_pruner(
249248
self.table_schema.clone(),
250249
&part,
@@ -254,7 +253,6 @@ impl Processor for ReadParquetDataSource<false> {
254253
continue;
255254
}
256255

257-
eprintln!("applying bloom filter");
258256
if runtime_bloom_filter_pruner(
259257
self.table_schema.clone(),
260258
&part,

src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,15 @@ pub async fn runtime_bloom_filter_pruner(
126126
Some(filter_expr),
127127
bloom_index_cols.clone(),
128128
)? {
129-
eprintln!("got bloom pruner");
130129
let should_keep = bloom_pruner
131130
.should_keep(&index_location, index_size, column_ids.clone())
132131
.await;
133132
if !should_keep {
134133
pruned = true;
135134
break;
136135
}
137-
} else {
138-
eprintln!("not suitable bloom pruner found")
139136
}
140137
}
141-
} else {
142-
eprintln!("not bloom_index_descriptor found");
143138
}
144139

145140
if pruned {

src/query/storages/fuse/src/operations/read_partitions.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -494,14 +494,11 @@ impl FuseTable {
494494
.unwrap_or((default.clone(), default.clone()))
495495
});
496496

497-
let bloom_desc = {
498-
// TODO none if BloomIndexColumns::None
499-
bloom_index_cols.map(|v| BloomIndexDescriptor {
500-
bloom_index_location: meta.bloom_filter_index_location.clone(),
501-
bloom_index_size: meta.block_size,
502-
bloom_index_cols: v,
503-
})
504-
};
497+
let bloom_desc = bloom_index_cols.map(|v| BloomIndexDescriptor {
498+
bloom_index_location: meta.bloom_filter_index_location.clone(),
499+
bloom_index_size: meta.bloom_filter_index_size,
500+
bloom_index_cols: v,
501+
});
505502

506503
FuseBlockPartInfo::create(
507504
location,
@@ -556,7 +553,7 @@ impl FuseTable {
556553

557554
let bloom_desc = bloom_index_cols.map(|v| BloomIndexDescriptor {
558555
bloom_index_location: meta.bloom_filter_index_location.clone(),
559-
bloom_index_size: meta.block_size,
556+
bloom_index_size: meta.bloom_filter_index_size,
560557
bloom_index_cols: v,
561558
});
562559
FuseBlockPartInfo::create(

src/query/storages/fuse/src/pruning/bloom_pruner.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,6 @@ impl BloomPrunerCreator {
7777
let bloom_column_fields = bloom_columns_map.values().cloned().collect::<Vec<_>>();
7878
let point_query_cols = BloomIndex::find_eq_columns(expr, bloom_column_fields)?;
7979

80-
eprintln!("is point_query empty? {}", point_query_cols.is_empty());
81-
eprintln!("expr ? {}", expr);
82-
8380
if !point_query_cols.is_empty() {
8481
// convert to filter column names
8582
let mut filter_fields = Vec::with_capacity(point_query_cols.len());

0 commit comments

Comments
 (0)