Skip to content

Commit a34da6f

Browse files
authored
feat: support pushdown predicate into iceberg engine (#16650)
* feat: support pushdown predicate into iceberg engine * feat: support pushdown predicate into iceberg engine * fix tests * fix tests
1 parent 9d3cd15 commit a34da6f

File tree

10 files changed

+441
-21
lines changed

10 files changed

+441
-21
lines changed

src/query/expression/src/expression.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl<Index: ColumnIndex> PartialEq for Expr<Index> {
290290
///
291291
/// The remote node will recover the `Arc` pointer within `FunctionCall` by looking
292292
/// up the function registry with the `FunctionID`.
293-
#[derive(Debug, Clone, Educe, Serialize, Deserialize)]
293+
#[derive(Debug, Clone, Educe, Serialize, Deserialize, EnumAsInner)]
294294
#[educe(PartialEq, Eq, Hash)]
295295
pub enum RemoteExpr<Index: ColumnIndex = usize> {
296296
Constant {

src/query/expression/src/type_check.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub fn check<Index: ColumnIndex>(
7575
args,
7676
params,
7777
} => {
78-
let args_expr: Vec<_> = args
78+
let mut args_expr: Vec<_> = args
7979
.iter()
8080
.map(|arg| check(arg, fn_registry))
8181
.try_collect()?;
@@ -84,41 +84,32 @@ pub fn check<Index: ColumnIndex>(
8484
// c:int16 = 12456 will be resolve as `to_int32(c) == to_int32(12456)`
8585
// This may hurt the bloom filter, we should try cast to literal as the datatype of column
8686
if name == "eq" && args_expr.len() == 2 {
87-
match args_expr.as_slice() {
87+
match args_expr.as_mut_slice() {
8888
[
8989
e,
9090
Expr::Constant {
9191
span,
9292
scalar,
93-
data_type: src_ty,
93+
data_type,
9494
},
9595
]
9696
| [
9797
Expr::Constant {
9898
span,
9999
scalar,
100-
data_type: src_ty,
100+
data_type,
101101
},
102102
e,
103103
] => {
104-
let src_ty = src_ty.remove_nullable();
104+
let src_ty = data_type.remove_nullable();
105105
let dest_ty = e.data_type().remove_nullable();
106106

107107
if dest_ty.is_integer() && src_ty.is_integer() {
108-
if let Ok(scalar) =
108+
if let Ok(casted_scalar) =
109109
cast_scalar(*span, scalar.clone(), dest_ty, fn_registry)
110110
{
111-
return check_function(
112-
*span,
113-
name,
114-
params,
115-
&[e.clone(), Expr::Constant {
116-
span: *span,
117-
data_type: scalar.as_ref().infer_data_type(),
118-
scalar,
119-
}],
120-
fn_registry,
121-
);
111+
*scalar = casted_scalar;
112+
*data_type = scalar.as_ref().infer_data_type();
122113
}
123114
}
124115
}

src/query/service/src/catalogs/default/immutable_catalog.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,11 @@ impl Catalog for ImmutableCatalog {
153153
CatalogInfo::default().into()
154154
}
155155

156+
fn disable_table_info_refresh(self: Arc<Self>) -> Result<Arc<dyn Catalog>> {
157+
let me = self.as_ref().clone();
158+
Ok(Arc::new(me))
159+
}
160+
156161
#[async_backtrace::framed]
157162
async fn get_database(&self, _tenant: &Tenant, db_name: &str) -> Result<Arc<dyn Database>> {
158163
match db_name {

src/query/sql/src/planner/semantic/type_check.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2761,13 +2761,16 @@ impl<'a> TypeChecker<'a> {
27612761
params: params.clone(),
27622762
args: arguments,
27632763
};
2764+
27642765
let expr = type_check::check(&raw_expr, &BUILTIN_FUNCTIONS)?;
27652766

27662767
// Run constant folding for arguments of the scalar function.
27672768
// This will be helpful to simplify some constant expressions, especially
27682769
// the implicitly casted literal values, e.g. `timestamp > '2001-01-01'`
27692770
// will be folded from `timestamp > to_timestamp('2001-01-01')` to `timestamp > 978307200000000`
2770-
let folded_args = match &expr {
2771+
// Note: check function may reorder the args
2772+
2773+
let mut folded_args = match &expr {
27712774
databend_common_expression::Expr::FunctionCall {
27722775
args: checked_args, ..
27732776
} => {
@@ -2795,6 +2798,15 @@ impl<'a> TypeChecker<'a> {
27952798
return Ok(constant);
27962799
}
27972800

2801+
// reorder
2802+
if func_name == "eq"
2803+
&& folded_args.len() == 2
2804+
&& matches!(folded_args[0], ScalarExpr::ConstantExpr(_))
2805+
&& !matches!(folded_args[1], ScalarExpr::ConstantExpr(_))
2806+
{
2807+
folded_args.swap(0, 1);
2808+
}
2809+
27982810
Ok(Box::new((
27992811
FunctionCall {
28002812
span,

src/query/storages/iceberg/src/catalog.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ impl Catalog for IcebergCatalog {
216216
self.info.clone()
217217
}
218218

219+
fn disable_table_info_refresh(self: Arc<Self>) -> Result<Arc<dyn Catalog>> {
220+
Ok(self)
221+
}
222+
219223
#[fastrace::trace]
220224
#[async_backtrace::framed]
221225
async fn get_database(&self, _tenant: &Tenant, db_name: &str) -> Result<Arc<dyn Database>> {

src/query/storages/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
mod catalog;
2323
mod database;
2424
mod partition;
25+
mod predicate;
2526
mod table;
2627
mod table_source;
2728

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_expression::types::DataType;
16+
use databend_common_expression::types::NumberScalar;
17+
use databend_common_expression::RemoteExpr;
18+
use databend_common_expression::Scalar;
19+
use iceberg::expr::Predicate;
20+
use iceberg::expr::Reference;
21+
use iceberg::spec::Datum;
22+
23+
#[derive(Default, Copy, Clone, Debug)]
24+
pub struct PredicateBuilder {
25+
uncertain: bool,
26+
}
27+
28+
impl PredicateBuilder {
29+
pub fn build(&mut self, expr: &RemoteExpr<String>) -> Predicate {
30+
match expr {
31+
RemoteExpr::Constant {
32+
span: _,
33+
scalar,
34+
data_type,
35+
} if data_type.remove_nullable() == DataType::Boolean => {
36+
let value = scalar.as_boolean();
37+
let is_true = value.copied().unwrap_or(false);
38+
if is_true {
39+
Predicate::AlwaysTrue
40+
} else {
41+
Predicate::AlwaysFalse
42+
}
43+
}
44+
45+
// is_true
46+
RemoteExpr::FunctionCall {
47+
span: _,
48+
id,
49+
generics: _,
50+
args,
51+
return_type: _,
52+
} if args.len() == 1 && id.name().as_ref() == "is_true" => {
53+
let predicate = self.build(&args[0]);
54+
if self.uncertain {
55+
return Predicate::AlwaysTrue;
56+
}
57+
match predicate {
58+
Predicate::AlwaysTrue => Predicate::AlwaysTrue,
59+
Predicate::AlwaysFalse => Predicate::AlwaysFalse,
60+
_ => predicate,
61+
}
62+
}
63+
64+
// unary
65+
RemoteExpr::FunctionCall {
66+
span: _,
67+
id,
68+
generics: _,
69+
args,
70+
return_type: _,
71+
} if args.len() == 1 && matches!(args[0], RemoteExpr::ColumnRef { .. }) => {
72+
let (_, name, _, _) = args[0].as_column_ref().unwrap();
73+
let r = Reference::new(name);
74+
if let Some(op) = build_unary(r, id.name().as_ref()) {
75+
return op;
76+
}
77+
self.uncertain = true;
78+
Predicate::AlwaysTrue
79+
}
80+
81+
// not
82+
RemoteExpr::FunctionCall {
83+
span: _,
84+
id,
85+
generics: _,
86+
args,
87+
return_type: _,
88+
} if args.len() == 1 && id.name().as_ref() == "not" => {
89+
let predicate = self.build(&args[0]);
90+
if self.uncertain {
91+
return Predicate::AlwaysTrue;
92+
}
93+
match predicate {
94+
Predicate::AlwaysTrue => Predicate::AlwaysFalse,
95+
Predicate::AlwaysFalse => Predicate::AlwaysTrue,
96+
_ => predicate.negate(),
97+
}
98+
}
99+
100+
// binary {a op datum}
101+
RemoteExpr::FunctionCall {
102+
span: _,
103+
id,
104+
generics: _,
105+
args,
106+
return_type: _,
107+
} if args.len() == 2 && ["and", "and_filters", "or"].contains(&id.name().as_ref()) => {
108+
let left = self.build(&args[0]);
109+
let right = self.build(&args[1]);
110+
if self.uncertain {
111+
return Predicate::AlwaysTrue;
112+
}
113+
match id.name().as_ref() {
114+
"and" | "and_filters" => left.and(right),
115+
"or" => left.or(right),
116+
_ => unreachable!(),
117+
}
118+
}
119+
120+
// binary {a op datum}
121+
RemoteExpr::FunctionCall {
122+
span: _,
123+
id,
124+
generics: _,
125+
args,
126+
return_type: _,
127+
} if args.len() == 2
128+
&& matches!(args[0], RemoteExpr::ColumnRef { .. })
129+
&& matches!(args[1], RemoteExpr::Constant { .. }) =>
130+
{
131+
let val = args[1].as_constant().unwrap();
132+
let val = scalar_to_datatum(val.1);
133+
if let Some(datum) = val {
134+
let (_, name, _, _) = args[0].as_column_ref().unwrap();
135+
let r = Reference::new(name);
136+
let p = build_binary(r, id.name().as_ref(), datum);
137+
if let Some(op) = p {
138+
return op;
139+
}
140+
}
141+
self.uncertain = true;
142+
Predicate::AlwaysTrue
143+
}
144+
145+
// binary {datum op a}
146+
RemoteExpr::FunctionCall {
147+
span: _,
148+
id,
149+
generics: _,
150+
args,
151+
return_type: _,
152+
} if args.len() == 2
153+
&& matches!(args[1], RemoteExpr::ColumnRef { .. })
154+
&& matches!(args[0], RemoteExpr::Constant { .. }) =>
155+
{
156+
let val = args[0].as_constant().unwrap();
157+
let val = scalar_to_datatum(val.1);
158+
if let Some(datum) = val {
159+
let (_, name, _, _) = args[1].as_column_ref().unwrap();
160+
let r = Reference::new(name);
161+
let p = build_reverse_binary(r, id.name().as_ref(), datum);
162+
if let Some(op) = p {
163+
return op;
164+
}
165+
}
166+
self.uncertain = true;
167+
Predicate::AlwaysTrue
168+
}
169+
170+
_ => {
171+
self.uncertain = true;
172+
Predicate::AlwaysTrue
173+
}
174+
}
175+
}
176+
}
177+
178+
fn build_unary(r: Reference, op: &str) -> Option<Predicate> {
179+
let op = match op {
180+
"is_null" => r.is_null(),
181+
"is_not_null" => r.is_not_null(),
182+
_ => return None,
183+
};
184+
Some(op)
185+
}
186+
187+
// a op datum
188+
fn build_binary(r: Reference, op: &str, datum: Datum) -> Option<Predicate> {
189+
let op = match op {
190+
"lt" | "<" => r.less_than(datum),
191+
"le" | "<=" => r.less_than_or_equal_to(datum),
192+
"gt" | ">" => r.greater_than(datum),
193+
"ge" | ">=" => r.greater_than_or_equal_to(datum),
194+
"eq" | "=" => r.equal_to(datum),
195+
"ne" | "!=" => r.not_equal_to(datum),
196+
_ => return None,
197+
};
198+
Some(op)
199+
}
200+
201+
// datum op a to a op_v datum
202+
fn build_reverse_binary(r: Reference, op: &str, datum: Datum) -> Option<Predicate> {
203+
let op = match op {
204+
"lt" | "<" => r.greater_than(datum),
205+
"le" | "<=" => r.greater_than_or_equal_to(datum),
206+
"gt" | ">" => r.less_than(datum),
207+
"ge" | ">=" => r.less_than_or_equal_to(datum),
208+
"eq" | "=" => r.equal_to(datum),
209+
"ne" | "!=" => r.not_equal_to(datum),
210+
_ => return None,
211+
};
212+
Some(op)
213+
}
214+
215+
fn scalar_to_datatum(scalar: &Scalar) -> Option<Datum> {
216+
let val = match scalar {
217+
Scalar::Number(n) => match n {
218+
NumberScalar::Int8(i) => Datum::int(*i as i32),
219+
NumberScalar::Int16(i) => Datum::int(*i as i32),
220+
NumberScalar::Int32(i) => Datum::int(*i),
221+
NumberScalar::Int64(i) => Datum::long(*i),
222+
NumberScalar::UInt8(i) => Datum::int(*i as i32),
223+
NumberScalar::UInt16(i) => Datum::int(*i as i32),
224+
NumberScalar::UInt32(i) if *i <= i32::MAX as u32 => Datum::int(*i as i32),
225+
NumberScalar::UInt64(i) if *i <= i64::MAX as u64 => Datum::long(*i as i64), /* Potential loss of precision */
226+
NumberScalar::Float32(f) => Datum::float(*f),
227+
NumberScalar::Float64(f) => Datum::double(*f),
228+
_ => return None,
229+
},
230+
Scalar::Timestamp(ts) => Datum::timestamp_micros(*ts),
231+
Scalar::Date(d) => Datum::date(*d),
232+
Scalar::Boolean(b) => Datum::bool(*b),
233+
Scalar::Binary(b) => Datum::binary(b.clone()),
234+
Scalar::String(s) => Datum::string(s),
235+
_ => return None,
236+
};
237+
Some(val)
238+
}

0 commit comments

Comments
 (0)