Skip to content

Commit 4a386ea

Browse files
authored
refactor: enables the bloom runtime filter to be turned on adaptively (#14686)
* feat: enables the bloom runtime filter to be turned on adaptively * fix * refactor
1 parent fe491cb commit 4a386ea

File tree

13 files changed

+61
-51
lines changed

13 files changed

+61
-51
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub struct HashJoinDesc {
4747
pub(crate) probe_keys_rt: Vec<Option<(Expr<String>, IndexType)>>,
4848
// Under cluster, mark if the join is broadcast join.
4949
pub broadcast: bool,
50+
// If enable bloom runtime filter
51+
pub enable_bloom_runtime_filter: bool,
5052
}
5153

5254
impl HashJoinDesc {
@@ -87,6 +89,7 @@ impl HashJoinDesc {
8789
probe_keys_rt,
8890
broadcast: join.broadcast,
8991
original_join_type: join.original_join_type.clone(),
92+
enable_bloom_runtime_filter: join.enable_bloom_runtime_filter,
9093
})
9194
}
9295

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,9 @@ impl HashJoinBuildState {
152152
if !is_cluster || is_broadcast_join {
153153
enable_inlist_runtime_filter = true;
154154
enable_min_max_runtime_filter = true;
155-
if ctx.get_settings().get_runtime_filter()? {
155+
if ctx.get_settings().get_bloom_runtime_filter()?
156+
&& hash_join_state.hash_join_desc.enable_bloom_runtime_filter
157+
{
156158
enable_bloom_runtime_filter = true;
157159
}
158160
}

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ impl PhysicalPlanReplacer for Fragmenter {
251251
need_hold_hash_table: plan.need_hold_hash_table,
252252
stat_info: plan.stat_info.clone(),
253253
probe_keys_rt: plan.probe_keys_rt.clone(),
254+
enable_bloom_runtime_filter: plan.enable_bloom_runtime_filter,
254255
broadcast: plan.broadcast,
255256
original_join_type: plan.original_join_type.clone(),
256257
}))

src/query/settings/src/settings_default.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,8 @@ impl DefaultSettings {
265265
mode: SettingMode::Both,
266266
range: None,
267267
}),
268-
("enable_runtime_filter", DefaultSettingValue {
269-
value: UserSettingValue::UInt64(0),
268+
("enable_bloom_runtime_filter", DefaultSettingValue {
269+
value: UserSettingValue::UInt64(1),
270270
desc: "Enables runtime filter optimization for JOIN.",
271271
mode: SettingMode::Both,
272272
range: Some(SettingRange::Numeric(0..=1)),

src/query/settings/src/settings_getter_setter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,8 @@ impl Settings {
269269
Ok(self.try_get_u64("join_spilling_threshold")? as usize)
270270
}
271271

272-
pub fn get_runtime_filter(&self) -> Result<bool> {
273-
Ok(self.try_get_u64("enable_runtime_filter")? != 0)
272+
pub fn get_bloom_runtime_filter(&self) -> Result<bool> {
273+
Ok(self.try_get_u64("enable_bloom_runtime_filter")? != 0)
274274
}
275275

276276
pub fn get_prefer_broadcast_join(&self) -> Result<bool> {

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ pub trait PhysicalPlanReplacer {
237237
need_hold_hash_table: plan.need_hold_hash_table,
238238
stat_info: plan.stat_info.clone(),
239239
probe_keys_rt: plan.probe_keys_rt.clone(),
240+
enable_bloom_runtime_filter: plan.enable_bloom_runtime_filter,
240241
broadcast: plan.broadcast,
241242
original_join_type: plan.original_join_type.clone(),
242243
}))

src/query/sql/src/executor/physical_plans/physical_hash_join.rs

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
17+
use databend_common_catalog::table_context::TableContext;
1518
use databend_common_exception::ErrorCode;
1619
use databend_common_exception::Result;
1720
use databend_common_expression::type_check::check_cast;
@@ -31,11 +34,13 @@ use crate::executor::physical_plans::FragmentKind;
3134
use crate::executor::PhysicalPlan;
3235
use crate::executor::PhysicalPlanBuilder;
3336
use crate::optimizer::ColumnSet;
37+
use crate::optimizer::RelExpr;
3438
use crate::optimizer::SExpr;
3539
use crate::plans::Join;
3640
use crate::plans::JoinType;
3741
use crate::ColumnEntry;
3842
use crate::IndexType;
43+
use crate::MetadataRef;
3944
use crate::ScalarExpr;
4045
use crate::TypeCheck;
4146

@@ -72,6 +77,8 @@ pub struct HashJoin {
7277

7378
// probe keys for runtime filter, and record the index of table that used in probe keys.
7479
pub probe_keys_rt: Vec<Option<(RemoteExpr<String>, IndexType)>>,
80+
// If enable bloom runtime filter
81+
pub enable_bloom_runtime_filter: bool,
7582
// Under cluster, mark if the join is broadcast join.
7683
pub broadcast: bool,
7784
// Original join type. Left/Right single join may be convert to inner join
@@ -191,6 +198,7 @@ impl PhysicalPlanBuilder {
191198
let mut right_join_conditions = Vec::new();
192199
let mut left_join_conditions_rt = Vec::new();
193200
let mut probe_to_build_index = Vec::new();
201+
let mut table_index = None;
194202
for (left_condition, right_condition) in join
195203
.left_conditions
196204
.iter()
@@ -213,18 +221,21 @@ impl PhysicalPlanBuilder {
213221
}) {
214222
if let Some(column_idx) = left_condition.used_columns().iter().next() {
215223
// Safe to unwrap because we have checked the column is a base table column.
216-
let table_index = self
217-
.metadata
218-
.read()
219-
.column(*column_idx)
220-
.table_index()
221-
.unwrap();
224+
if table_index.is_none() {
225+
table_index = Some(
226+
self.metadata
227+
.read()
228+
.column(*column_idx)
229+
.table_index()
230+
.unwrap(),
231+
);
232+
}
222233
Some((
223234
left_condition
224235
.as_raw_expr()
225236
.type_check(&*self.metadata.read())?
226237
.project_column_ref(|col| col.column_name.clone()),
227-
table_index,
238+
table_index.unwrap(),
228239
))
229240
} else {
230241
None
@@ -499,6 +510,35 @@ impl PhysicalPlanBuilder {
499510
stat_info: Some(stat_info),
500511
broadcast: is_broadcast,
501512
original_join_type: join.original_join_type.clone(),
513+
enable_bloom_runtime_filter: adjust_bloom_runtime_filter(
514+
self.ctx.clone(),
515+
&self.metadata,
516+
table_index,
517+
s_expr,
518+
)
519+
.await?,
502520
}))
503521
}
504522
}
523+
524+
// Check if enable bloom runtime filter
525+
async fn adjust_bloom_runtime_filter(
526+
ctx: Arc<dyn TableContext>,
527+
metadata: &MetadataRef,
528+
table_index: Option<IndexType>,
529+
s_expr: &SExpr,
530+
) -> Result<bool> {
531+
if let Some(table_index) = table_index {
532+
let table = metadata.read().table(table_index).table();
533+
if let Some(stats) = table.table_statistics(ctx.clone()).await? {
534+
if let Some(num_rows) = stats.num_rows {
535+
let join_cardinality = RelExpr::with_s_expr(s_expr)
536+
.derive_cardinality()?
537+
.cardinality;
538+
// If the filtered data reduces to less than 1/1000 of the original dataset, we will enable bloom runtime filter.
539+
return Ok(join_cardinality <= (num_rows / 1000) as f64);
540+
}
541+
}
542+
}
543+
Ok(false)
544+
}
Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
statement ok
22
set enable_distributed_merge_into = 1;
33

4-
statement ok
5-
set enable_runtime_filter = 1;
6-
74
include ./09_0036_merge_into_without_distributed_enable.test
85

96
statement ok
107
set enable_distributed_merge_into = 0;
11-
12-
statement ok
13-
set enable_runtime_filter = 0;

tests/sqllogictests/suites/query/join/runtime_filter.test

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
statement ok
2-
set enable_runtime_filter = 1;
3-
41
statement ok
52
CREATE TABLE table1 (
63
key1 String,
@@ -89,9 +86,6 @@ NULL NULL
8986
NULL NULL
9087
NULL NULL
9188

92-
statement ok
93-
set enable_runtime_filter = 0;
94-
9589
statement ok
9690
drop table table1;
9791

tests/sqllogictests/suites/tpcds/queries.test

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
statement ok
22
set sandbox_tenant = 'test_tenant';
33

4-
statement ok
5-
set enable_runtime_filter = 1;
6-
74
statement ok
85
use tpcds;
96

@@ -7979,6 +7976,3 @@ Conventional childr NEXT DAY ny metro 256 251 235 0 0
79797976
Conventional childr OVERNIGHT ny metro 188 181 188 0 0
79807977
Conventional childr REGULAR ny metro 179 150 215 0 0
79817978
Conventional childr TWO DAY ny metro 185 183 158 0 0
7982-
7983-
statement ok
7984-
set enable_runtime_filter = 0

0 commit comments

Comments
 (0)