Skip to content

Commit 14819b3

Browse files
authored
feat(query): support inverted_filter to omit the filter executor (#12934)
* feat(query): support inverted_filter to omit the filter executor * feat(query): support inverted_filter to omit the filter executor * feat(query): support inverted_filter to omit the filter executor * feat(query): support inverted_filter to omit the filter executor * feat(query): support inverted_filter to omit the filter executor * feat(query): address comments
1 parent 2d61b2f commit 14819b3

File tree

33 files changed

+363
-192
lines changed

33 files changed

+363
-192
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_metrics::register_counter;
16+
use common_metrics::Counter;
17+
use lazy_static::lazy_static;
18+
19+
lazy_static! {
20+
static ref OMIT_FILTER_ROWGROUPS: Counter = register_counter("omit_filter_rowgroups");
21+
static ref OMIT_FILTER_ROWS: Counter = register_counter("omit_filter_rows");
22+
}
23+
24+
pub fn metrics_inc_omit_filter_rowgroups(c: u64) {
25+
OMIT_FILTER_ROWGROUPS.inc_by(c);
26+
}
27+
28+
pub fn metrics_inc_omit_filter_rows(c: u64) {
29+
OMIT_FILTER_ROWS.inc_by(c);
30+
}

โ€Žsrc/common/storage/src/metrics/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub mod common;
1516
pub mod copy;
1617
pub mod merge_into;
1718
mod storage_metrics;

โ€Žsrc/query/catalog/src/plan/pushdown.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ pub struct PushDownInfo {
7777
/// The difference with `projection` is the removal of the source columns
7878
/// which were only used to generate virtual columns.
7979
pub output_columns: Option<Projection>,
80-
/// Optional filter expression plan
80+
/// Optional filter and reverse filter expression plan
8181
/// Assumption: expression's data type must be `DataType::Boolean`.
82-
pub filter: Option<RemoteExpr<String>>,
82+
pub filters: Option<Filters>,
8383
pub is_deterministic: bool,
8484
/// Optional prewhere information
8585
/// used for prewhere optimization
@@ -96,6 +96,12 @@ pub struct PushDownInfo {
9696
pub agg_index: Option<AggIndexInfo>,
9797
}
9898

99+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
100+
pub struct Filters {
101+
pub filter: RemoteExpr<String>,
102+
pub inverted_filter: RemoteExpr<String>,
103+
}
104+
99105
/// TopK is a wrapper for topk push down items.
100106
/// We only take the first column in order_by as the topk column.
101107
#[derive(Debug, Clone)]

โ€Žsrc/query/catalog/src/table.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -465,14 +465,6 @@ pub struct NavigationDescriptor {
465465
pub point: NavigationPoint,
466466
}
467467

468-
#[derive(Debug, Clone)]
469-
pub struct DeletionFilters {
470-
// the filter expression for the deletion
471-
pub filter: RemoteExpr<String>,
472-
// just "not(filter)"
473-
pub inverted_filter: RemoteExpr<String>,
474-
}
475-
476468
use std::collections::HashMap;
477469

478470
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]

โ€Žsrc/query/expression/src/expression.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,17 @@ pub enum Expr<Index: ColumnIndex = usize> {
124124
///
125125
/// The remote node will recover the `Arc` pointer within `FunctionCall` by looking
126126
/// up the function registry with the `FunctionID`.
127-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
127+
#[derive(Debug, Clone, Educe, Serialize, Deserialize)]
128+
#[educe(PartialEq, Eq, Hash)]
128129
pub enum RemoteExpr<Index: ColumnIndex = usize> {
129130
Constant {
131+
#[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))]
130132
span: Span,
131133
scalar: Scalar,
132134
data_type: DataType,
133135
},
134136
ColumnRef {
137+
#[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))]
135138
span: Span,
136139
id: Index,
137140
data_type: DataType,
@@ -140,19 +143,22 @@ pub enum RemoteExpr<Index: ColumnIndex = usize> {
140143
display_name: String,
141144
},
142145
Cast {
146+
#[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))]
143147
span: Span,
144148
is_try: bool,
145149
expr: Box<RemoteExpr<Index>>,
146150
dest_type: DataType,
147151
},
148152
FunctionCall {
153+
#[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))]
149154
span: Span,
150155
id: FunctionID,
151156
generics: Vec<DataType>,
152157
args: Vec<RemoteExpr<Index>>,
153158
return_type: DataType,
154159
},
155160
UDFServerCall {
161+
#[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))]
156162
span: Span,
157163
func_name: String,
158164
server_addr: String,

โ€Žsrc/query/service/src/interpreters/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ pub use refresh_aggregating_index::hook_refresh_agg_index;
2525
pub use refresh_aggregating_index::RefreshAggIndexDesc;
2626
pub use table::check_referenced_computed_columns;
2727
pub use util::check_deduplicate_label;
28+
pub use util::create_push_down_filters;
2829

2930
pub use self::metrics::*;

โ€Žsrc/query/service/src/interpreters/common/util.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,17 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_catalog::plan::Filters;
1718
use common_catalog::table_context::TableContext;
1819
use common_exception::Result;
20+
use common_expression::type_check::check_function;
21+
use common_functions::BUILTIN_FUNCTIONS;
1922
use common_meta_kvapi::kvapi::KVApi;
2023
use common_users::UserApiProvider;
2124

25+
use crate::sql::executor::cast_expr_to_non_null_boolean;
26+
use crate::sql::ScalarExpr;
27+
2228
/// Checks if a duplicate label exists in the meta store.
2329
///
2430
/// # Arguments
@@ -41,3 +47,22 @@ pub async fn check_deduplicate_label(ctx: Arc<dyn TableContext>) -> Result<bool>
4147
}
4248
}
4349
}
50+
51+
pub fn create_push_down_filters(scalar: &ScalarExpr) -> Result<Filters> {
52+
let filter = cast_expr_to_non_null_boolean(
53+
scalar
54+
.as_expr()?
55+
.project_column_ref(|col| col.column_name.clone()),
56+
)?;
57+
58+
let remote_filter = filter.as_remote_expr();
59+
60+
// prepare the inverse filter expression
61+
let remote_inverted_filter =
62+
check_function(None, "not", &[], &[filter], &BUILTIN_FUNCTIONS)?.as_remote_expr();
63+
64+
Ok(Filters {
65+
filter: remote_filter,
66+
inverted_filter: remote_inverted_filter,
67+
})
68+
}

โ€Žsrc/query/service/src/interpreters/interpreter_delete.rs

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::sync::Arc;
1818

1919
use common_base::runtime::GlobalIORuntime;
2020
use common_catalog::plan::Partitions;
21-
use common_catalog::table::DeletionFilters;
2221
use common_exception::ErrorCode;
2322
use common_exception::Result;
2423
use common_expression::types::DataType;
@@ -30,7 +29,6 @@ use common_functions::BUILTIN_FUNCTIONS;
3029
use common_meta_app::schema::CatalogInfo;
3130
use common_meta_app::schema::TableInfo;
3231
use common_sql::binder::ColumnBindingBuilder;
33-
use common_sql::executor::cast_expr_to_non_null_boolean;
3432
use common_sql::executor::DeletePartial;
3533
use common_sql::executor::Exchange;
3634
use common_sql::executor::FragmentKind;
@@ -60,6 +58,7 @@ use log::debug;
6058
use storages_common_table_meta::meta::TableSnapshot;
6159
use table_lock::TableLockHandlerWrapper;
6260

61+
use crate::interpreters::common::create_push_down_filters;
6362
use crate::interpreters::Interpreter;
6463
use crate::interpreters::SelectInterpreter;
6564
use crate::pipelines::executor::ExecutorSettings;
@@ -164,36 +163,15 @@ impl Interpreter for DeleteInterpreter {
164163

165164
let (filters, col_indices) = if let Some(scalar) = selection {
166165
// prepare the filter expression
167-
let filter = cast_expr_to_non_null_boolean(
168-
scalar
169-
.as_expr()?
170-
.project_column_ref(|col| col.column_name.clone()),
171-
)?
172-
.as_remote_expr();
173-
174-
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
166+
let filters = create_push_down_filters(&scalar)?;
167+
168+
let expr = filters.filter.as_expr(&BUILTIN_FUNCTIONS);
175169
if !expr.is_deterministic(&BUILTIN_FUNCTIONS) {
176170
return Err(ErrorCode::Unimplemented(
177171
"Delete must have deterministic predicate",
178172
));
179173
}
180174

181-
// prepare the inverse filter expression
182-
let inverted_filter = {
183-
let inverse = ScalarExpr::FunctionCall(common_sql::planner::plans::FunctionCall {
184-
span: None,
185-
func_name: "not".to_string(),
186-
params: vec![],
187-
arguments: vec![scalar.clone()],
188-
});
189-
cast_expr_to_non_null_boolean(
190-
inverse
191-
.as_expr()?
192-
.project_column_ref(|col| col.column_name.clone()),
193-
)?
194-
.as_remote_expr()
195-
};
196-
197175
let col_indices: Vec<usize> = if !self.plan.subquery_desc.is_empty() {
198176
let mut col_indices = HashSet::new();
199177
for subquery_desc in &self.plan.subquery_desc {
@@ -203,13 +181,7 @@ impl Interpreter for DeleteInterpreter {
203181
} else {
204182
scalar.used_columns().into_iter().collect()
205183
};
206-
(
207-
Some(DeletionFilters {
208-
filter,
209-
inverted_filter,
210-
}),
211-
col_indices,
212-
)
184+
(Some(filters), col_indices)
213185
} else {
214186
(None, vec![])
215187
};

โ€Žsrc/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ use std::sync::Arc;
1616
use std::time::Duration;
1717
use std::time::SystemTime;
1818

19+
use common_catalog::plan::Filters;
1920
use common_catalog::plan::PushDownInfo;
2021
use common_exception::ErrorCode;
2122
use common_exception::Result;
23+
use common_expression::type_check::check_function;
24+
use common_functions::BUILTIN_FUNCTIONS;
2225
use log::error;
2326
use log::info;
2427
use log::warn;
@@ -31,6 +34,7 @@ use crate::pipelines::Pipeline;
3134
use crate::pipelines::PipelineBuildResult;
3235
use crate::sessions::QueryContext;
3336
use crate::sessions::TableContext;
37+
use crate::sql::executor::cast_expr_to_non_null_boolean;
3438
use crate::sql::plans::ReclusterTablePlan;
3539

3640
pub struct ReclusterTableInterpreter {
@@ -68,13 +72,23 @@ impl Interpreter for ReclusterTableInterpreter {
6872

6973
// Build extras via push down scalar
7074
let extras = if let Some(scalar) = &plan.push_downs {
71-
let filter = scalar
72-
.as_expr()?
73-
.project_column_ref(|col| col.column_name.clone())
74-
.as_remote_expr();
75+
// prepare the filter expression
76+
let filter = cast_expr_to_non_null_boolean(
77+
scalar
78+
.as_expr()?
79+
.project_column_ref(|col| col.column_name.clone()),
80+
)?;
81+
// prepare the inverse filter expression
82+
let inverted_filter =
83+
check_function(None, "not", &[], &[filter.clone()], &BUILTIN_FUNCTIONS)?;
84+
85+
let filters = Filters {
86+
filter: filter.as_remote_expr(),
87+
inverted_filter: inverted_filter.as_remote_expr(),
88+
};
7589

7690
Some(PushDownInfo {
77-
filter: Some(filter),
91+
filters: Some(filters),
7892
..PushDownInfo::default()
7993
})
8094
} else {

โ€Žsrc/query/service/src/table_functions/numbers/numbers_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl Table for NumbersTable {
137137
let mut limit = None;
138138

139139
if let Some(extras) = &push_downs {
140-
if extras.limit.is_some() && extras.filter.is_none() && extras.order_by.is_empty() {
140+
if extras.limit.is_some() && extras.filters.is_none() && extras.order_by.is_empty() {
141141
// It is allowed to have an error when we can't get sort columns from the expression. For
142142
// example 'select number from numbers(10) order by number+4 limit 10', the column 'number+4'
143143
// doesn't exist in the numbers table.

0 commit comments

Comments
ย (0)