Skip to content

Commit 985d6f8

Browse files
authored
Merge pull request #7725 from RinChanNOWWW/nullable_pruner
feat(query): enable range filter to pre-filter `is_null` and `is_not_null` ops.
2 parents 254feaf + 88b0854 commit 985d6f8

File tree

6 files changed

+108
-21
lines changed

6 files changed

+108
-21
lines changed

src/query/service/tests/it/storages/index/range_filter.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ async fn test_range_filter() -> Result<()> {
9191
Test {
9292
name: "a is not null",
9393
expr: Expression::create_scalar_function("is_not_null", vec![col("a")]),
94+
expect: false,
95+
error: "",
96+
},
97+
Test {
98+
name: "b is not null",
99+
expr: Expression::create_scalar_function("is_not_null", vec![col("b")]),
94100
expect: true,
95101
error: "",
96102
},
@@ -192,7 +198,7 @@ async fn test_range_filter() -> Result<()> {
192198
for test in tests {
193199
let prune = RangeFilter::try_create(ctx.clone(), &[test.expr], schema.clone())?;
194200

195-
match prune.eval(&stats) {
201+
match prune.eval(&stats, 1) {
196202
Ok(actual) => assert_eq!(test.expect, actual, "{:#?}", test.name),
197203
Err(e) => assert_eq!(test.error, e.to_string(), "{}", test.name),
198204
}
@@ -239,7 +245,7 @@ fn test_build_verifiable_function() -> Result<()> {
239245
Test {
240246
name: "a is not null",
241247
expr: Expression::create_scalar_function("is_not_null", vec![col("a")]),
242-
expect: "is_not_null(min_a)",
248+
expect: "(nulls_a != row_count_a)",
243249
},
244250
Test {
245251
name: "b >= 0 and c like 0xffffff",

src/query/storages/fuse/src/pruning/pruning_executor.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,19 @@ impl BlockPruner {
118118
}
119119
let segment_info = segment_reader.read(seg_loc, None, ver).await?;
120120
let mut result = Vec::with_capacity(segment_info.blocks.len());
121-
if range_filter_pruner.should_keep(&segment_info.summary.col_stats) {
121+
if range_filter_pruner.should_keep(
122+
&segment_info.summary.col_stats,
123+
segment_info.summary.row_count,
124+
) {
122125
for block_meta in &segment_info.blocks {
123126
// prune block using range filter
124127
if limiter.exceeded() {
125128
// before using bloom index to prune, check if limit already exceeded
126129
return Ok(result);
127130
}
128-
if range_filter_pruner.should_keep(&block_meta.col_stats) {
131+
if range_filter_pruner
132+
.should_keep(&block_meta.col_stats, block_meta.row_count)
133+
{
129134
// prune block using bloom filter
130135
if bloom_filter_pruner
131136
.should_keep(

src/query/storages/fuse/src/pruning/range_pruner.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,28 @@ use common_storages_index::RangeFilter;
2323

2424
pub trait RangeFilterPruner {
2525
// returns ture, if target should NOT be pruned (false positive allowed)
26-
fn should_keep(&self, input: &StatisticsOfColumns) -> bool;
26+
fn should_keep(&self, input: &StatisticsOfColumns, row_count: u64) -> bool;
2727
}
2828

2929
struct KeepTrue;
3030

3131
impl RangeFilterPruner for KeepTrue {
32-
fn should_keep(&self, _input: &StatisticsOfColumns) -> bool {
32+
fn should_keep(&self, _input: &StatisticsOfColumns, _row_count: u64) -> bool {
3333
true
3434
}
3535
}
3636

3737
struct KeepFalse;
3838

3939
impl RangeFilterPruner for KeepFalse {
40-
fn should_keep(&self, _input: &StatisticsOfColumns) -> bool {
40+
fn should_keep(&self, _input: &StatisticsOfColumns, _row_count: u64) -> bool {
4141
false
4242
}
4343
}
4444

4545
impl RangeFilterPruner for RangeFilter {
46-
fn should_keep(&self, stats: &StatisticsOfColumns) -> bool {
47-
match self.eval(stats) {
46+
fn should_keep(&self, stats: &StatisticsOfColumns, row_count: u64) -> bool {
47+
match self.eval(stats, row_count) {
4848
Ok(r) => r,
4949
Err(e) => {
5050
// swallow exceptions intentionally, corrupted index should not prevent execution

src/query/storages/hive/src/hive_partition_pruner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl HivePartitionPruner {
9999
let column_stats = self.get_column_stats(&partitions)?;
100100
let mut filted_partitions = vec![];
101101
for (idx, stats) in column_stats.into_iter().enumerate() {
102-
if range_filter.eval(&stats)? {
102+
if range_filter.eval(&stats, 1)? {
103103
filted_partitions.push(partitions[idx].clone());
104104
}
105105
}

src/query/storages/index/src/range_filter.rs

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,18 @@ impl RangeFilter {
105105
}
106106

107107
#[tracing::instrument(level = "debug", name = "range_filter_eval", skip_all)]
108-
pub fn eval(&self, stats: &StatisticsOfColumns) -> Result<bool> {
108+
pub fn eval(&self, stats: &StatisticsOfColumns, row_count: u64) -> Result<bool> {
109109
let mut columns = Vec::with_capacity(self.stat_columns.len());
110110
for col in self.stat_columns.iter() {
111-
let val_opt = col.apply_stat_value(stats, self.origin.clone())?;
112-
if val_opt.is_none() {
113-
return Ok(true);
111+
if col.stat_type == StatType::RowCount {
112+
columns.push(Series::from_data(vec![row_count]));
113+
} else {
114+
let val_opt = col.apply_stat_value(stats, self.origin.clone())?;
115+
if val_opt.is_none() {
116+
return Ok(true);
117+
}
118+
columns.push(val_opt.unwrap());
114119
}
115-
columns.push(val_opt.unwrap());
116120
}
117121
let data_block = DataBlock::create(self.schema.clone(), columns);
118122
let executed_data_block = self.executor.execute(&data_block)?;
@@ -135,7 +139,7 @@ pub fn build_verifiable_expr(
135139

136140
let (exprs, op) = match expr {
137141
Expression::Literal { .. } => return expr.clone(),
138-
Expression::ScalarFunction { op, args } => (args.clone(), op.clone()),
142+
Expression::ScalarFunction { op, args } => try_convert_is_null(op, args.clone()),
139143
Expression::BinaryExpression { left, op, right } => match op.to_lowercase().as_str() {
140144
"and" => {
141145
let left = build_verifiable_expr(left, schema, stat_columns);
@@ -173,11 +177,30 @@ fn inverse_operator(op: &str) -> Result<&str> {
173177
}
174178
}
175179

180+
/// Try to convert `not(is_not_null)` to `is_null`.
181+
fn try_convert_is_null(op: &str, args: Vec<Expression>) -> (Vec<Expression>, String) {
182+
// `is null` will be converted to `not(is not null)` in the parser.
183+
// we should convert it back to `is null` here.
184+
if op == "not" && args.len() == 1 {
185+
if let Expression::ScalarFunction {
186+
op: inner_op,
187+
args: inner_args,
188+
} = &args[0]
189+
{
190+
if inner_op == "is_not_null" {
191+
return (inner_args.clone(), String::from("is_null"));
192+
}
193+
}
194+
}
195+
(args, String::from(op))
196+
}
197+
176198
#[derive(Debug, Copy, Clone, PartialEq)]
177199
enum StatType {
178200
Min,
179201
Max,
180202
Nulls,
203+
RowCount,
181204
}
182205

183206
impl fmt::Display for StatType {
@@ -186,6 +209,7 @@ impl fmt::Display for StatType {
186209
StatType::Min => write!(f, "min"),
187210
StatType::Max => write!(f, "max"),
188211
StatType::Nulls => write!(f, "nulls"),
212+
StatType::RowCount => write!(f, "row_count"),
189213
}
190214
}
191215
}
@@ -209,7 +233,7 @@ impl StatColumn {
209233
expr: Expression,
210234
) -> Self {
211235
let column_new = format!("{}_{}", stat_type, field.name());
212-
let data_type = if matches!(stat_type, StatType::Nulls) {
236+
let data_type = if matches!(stat_type, StatType::Nulls | StatType::RowCount) {
213237
u64::to_data_type()
214238
} else {
215239
field.data_type().clone()
@@ -400,15 +424,16 @@ impl<'a> VerifiableExprBuilder<'a> {
400424
// TODO: support in/not in.
401425
match self.op {
402426
"is_null" => {
427+
// should_keep: col.null_count > 0
403428
let nulls_expr = self.nulls_column_expr(0)?;
404429
let scalar_expr = lit(0u64);
405430
Ok(nulls_expr.gt(scalar_expr))
406431
}
407432
"is_not_null" => {
408-
let left_min = self.min_column_expr(0)?;
409-
Ok(Expression::create_scalar_function("is_not_null", vec![
410-
left_min,
411-
]))
433+
// should_keep: col.null_count != col.row_count
434+
let nulls_expr = self.nulls_column_expr(0)?;
435+
let row_count_expr = self.row_count_column_expr(0)?;
436+
Ok(nulls_expr.not_eq(row_count_expr))
412437
}
413438
"=" => {
414439
// left = right => min_left <= max_right and max_left >= min_right
@@ -582,6 +607,10 @@ impl<'a> VerifiableExprBuilder<'a> {
582607
fn nulls_column_expr(&mut self, index: usize) -> Result<Expression> {
583608
self.stat_column_expr(StatType::Nulls, index)
584609
}
610+
611+
fn row_count_column_expr(&mut self, index: usize) -> Result<Expression> {
612+
self.stat_column_expr(StatType::RowCount, index)
613+
}
585614
}
586615

587616
fn is_like_pattern_escape(c: u8) -> bool {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
statement ok
2+
create table t_nullable_prune (a int null);
3+
4+
statement ok
5+
insert into t_nullable_prune select * from numbers(3);
6+
7+
statement ok
8+
insert into t_nullable_prune select null from numbers(3);
9+
10+
statement query T
11+
explain select * from t_nullable_prune;
12+
13+
----
14+
TableScan
15+
├── table: default.default.t_nullable_prune
16+
├── read rows: 6
17+
├── read bytes: 62
18+
├── partitions total: 2
19+
├── partitions scanned: 2
20+
└── push downs: [filters: [], limit: NONE]
21+
22+
statement query T
23+
explain select * from t_nullable_prune where a is not null;
24+
25+
----
26+
TableScan
27+
├── table: default.default.t_nullable_prune
28+
├── read rows: 3
29+
├── read bytes: 37
30+
├── partitions total: 2
31+
├── partitions scanned: 1
32+
└── push downs: [filters: [is_not_null(a)], limit: NONE]
33+
34+
statement query T
35+
explain select * from t_nullable_prune where a is null;
36+
37+
----
38+
TableScan
39+
├── table: default.default.t_nullable_prune
40+
├── read rows: 3
41+
├── read bytes: 25
42+
├── partitions total: 2
43+
├── partitions scanned: 1
44+
└── push downs: [filters: [not(is_not_null(a))], limit: NONE]
45+
46+
statement ok
47+
DROP TABLE default.default.t_nullable_prune;

0 commit comments

Comments
 (0)