Skip to content

Commit 6a3cd13

Browse files
authored
fix: build runtime filter in cluster (#18247)
* avoid too much log * fix: build runtime filter in cluster * add optimizer test * simplify test * update * remove unused * fix * fix test * fix test * refine explain runtime filter * make lint-yaml * make lint * refactor test * fix ut * fix logic test * fix logic test * fix logic test * rm useless file * fix * fix ut * fix test * fix
1 parent 78754dd commit 6a3cd13

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+215
-222
lines changed

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ impl Debug for RuntimeFilterInfo {
4444
.collect::<Vec<String>>()
4545
.join(","),
4646
self.bloom
47+
.iter()
48+
.map(|(name, _)| name)
49+
.collect::<Vec<&String>>()
4750
)
4851
}
4952
}

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
use std::collections::HashMap;
1616
use std::collections::HashSet;
17+
use std::fmt;
18+
use std::fmt::Debug;
1719

1820
use databend_common_expression::BlockMetaInfo;
1921
use databend_common_expression::BlockMetaInfoDowncast;
@@ -28,14 +30,27 @@ use databend_common_expression::Scalar;
2830
/// * `inlist` - Deduplicated list of build key column
2931
/// * `min_max` - The min and max values of the build column
3032
/// * `bloom` - The deduplicated hashes of the build column
31-
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
33+
#[derive(serde::Serialize, serde::Deserialize, Clone, Default, PartialEq)]
3234
pub struct RuntimeFilterPacket {
3335
pub id: usize,
3436
pub inlist: Option<Column>,
3537
pub min_max: Option<SerializableDomain>,
3638
pub bloom: Option<HashSet<u64>>,
3739
}
3840

41+
impl Debug for RuntimeFilterPacket {
42+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43+
write!(
44+
f,
45+
"RuntimeFilterPacket {{ id: {}, inlist: {:?}, min_max: {:?}, bloom: {:?} }}",
46+
self.id,
47+
self.inlist,
48+
self.min_max,
49+
self.bloom.is_some()
50+
)
51+
}
52+
}
53+
3954
/// Represents a collection of runtime filter packets that correspond to a join operator.
4055
///
4156
/// # Fields

src/query/service/tests/it/sql/planner/optimizer/data/results/basic/01_cross_join_aggregation_physical.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ AggregateFinal
1717
├── probe keys: []
1818
├── keys is null equal: []
1919
├── filters: []
20-
├── build join filters:
2120
├── estimated rows: 25000000.00
2221
├── Exchange(Build)
2322
│ ├── output columns: [i2.i (#1)]

src/query/sql/src/executor/format.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1587,9 +1587,22 @@ fn hash_join_to_format_tree(
15871587
FormatTreeNode::new(format!("probe keys: [{probe_keys}]")),
15881588
FormatTreeNode::new(format!("keys is null equal: [{is_null_equal}]")),
15891589
FormatTreeNode::new(format!("filters: [{filters}]")),
1590-
FormatTreeNode::with_children(format!("build join filters:"), build_runtime_filters),
15911590
];
15921591

1592+
if !build_runtime_filters.is_empty() {
1593+
if plan.broadcast_id.is_some() {
1594+
children.push(FormatTreeNode::with_children(
1595+
format!("build join filters(distributed):"),
1596+
build_runtime_filters,
1597+
));
1598+
} else {
1599+
children.push(FormatTreeNode::with_children(
1600+
format!("build join filters:"),
1601+
build_runtime_filters,
1602+
));
1603+
}
1604+
}
1605+
15931606
if let Some((cache_index, column_map)) = &plan.build_side_cache_info {
15941607
let mut column_indexes = column_map.keys().collect::<Vec<_>>();
15951608
column_indexes.sort();

src/query/sql/src/executor/physical_plans/physical_hash_join.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use databend_common_expression::RemoteExpr;
2828
use databend_common_functions::BUILTIN_FUNCTIONS;
2929

3030
use super::physical_join_filter::PhysicalRuntimeFilters;
31-
use super::FragmentKind;
3231
use super::JoinRuntimeFilter;
3332
use crate::executor::explain::PlanStatsInfo;
3433
use crate::executor::physical_plans::Exchange;
@@ -789,6 +788,7 @@ impl PhysicalPlanBuilder {
789788
#[allow(clippy::too_many_arguments)]
790789
fn create_hash_join(
791790
&self,
791+
s_expr: &SExpr,
792792
join: &Join,
793793
probe_side: Box<PhysicalPlan>,
794794
build_side: Box<PhysicalPlan>,
@@ -805,10 +805,10 @@ impl PhysicalPlanBuilder {
805805
runtime_filter: PhysicalRuntimeFilters,
806806
stat_info: PlanStatsInfo,
807807
) -> Result<PhysicalPlan> {
808-
let broadcast_id = if let PhysicalPlan::Exchange(Exchange {
809-
kind: FragmentKind::Normal,
810-
..
811-
}) = build_side.as_ref()
808+
let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?;
809+
let broadcast_id = if build_side_data_distribution
810+
.as_ref()
811+
.is_some_and(|e| matches!(e, crate::plans::Exchange::Hash(_)))
812812
{
813813
Some(self.ctx.get_next_broadcast_id())
814814
} else {
@@ -917,12 +917,12 @@ impl PhysicalPlanBuilder {
917917
s_expr,
918918
&right_join_conditions,
919919
left_join_conditions_rt,
920-
&build_side,
921920
)
922921
.await?;
923922

924923
// Step 12: Create and return the HashJoin
925924
self.create_hash_join(
925+
s_expr,
926926
join,
927927
probe_side,
928928
build_side,
@@ -947,7 +947,6 @@ impl PhysicalPlanBuilder {
947947
s_expr: &SExpr,
948948
build_keys: &[RemoteExpr],
949949
probe_keys: Vec<Option<(RemoteExpr<String>, usize, usize)>>,
950-
build_side: &PhysicalPlan,
951950
) -> Result<PhysicalRuntimeFilters> {
952951
JoinRuntimeFilter::build_runtime_filter(
953952
self.ctx.clone(),
@@ -956,7 +955,6 @@ impl PhysicalPlanBuilder {
956955
s_expr,
957956
build_keys,
958957
probe_keys,
959-
build_side,
960958
)
961959
.await
962960
}

src/query/sql/src/executor/physical_plans/physical_join_filter.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@ use databend_common_expression::RemoteExpr;
2222
use databend_common_functions::BUILTIN_FUNCTIONS;
2323
use databend_storages_common_table_meta::table::get_change_type;
2424

25-
use super::FragmentKind;
26-
use crate::executor::PhysicalPlan;
2725
use crate::optimizer::ir::RelExpr;
2826
use crate::optimizer::ir::SExpr;
27+
use crate::plans::Exchange;
2928
use crate::plans::Join;
3029
use crate::plans::JoinType;
3130
use crate::IndexType;
@@ -145,7 +144,6 @@ impl JoinRuntimeFilter {
145144
s_expr: &SExpr,
146145
build_keys: &[RemoteExpr],
147146
probe_keys: Vec<Option<(RemoteExpr<String>, usize, usize)>>,
148-
build_side: &PhysicalPlan,
149147
) -> Result<PhysicalRuntimeFilters> {
150148
if !ctx.get_settings().get_enable_join_runtime_filter()? {
151149
return Ok(Default::default());
@@ -157,6 +155,8 @@ impl JoinRuntimeFilter {
157155

158156
let mut filters = Vec::new();
159157

158+
let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?;
159+
160160
// Process each probe key that has runtime filter information
161161
for (build_key, probe_key, scan_id, table_index) in build_keys
162162
.iter()
@@ -176,9 +176,9 @@ impl JoinRuntimeFilter {
176176

177177
// Determine which filter types to enable based on data type and statistics
178178
let enable_bloom_runtime_filter = {
179-
let enable_in_cluster = build_side
180-
.as_exchange()
181-
.is_none_or(|e| matches!(e.kind, FragmentKind::Expansive));
179+
let enable_in_cluster = build_side_data_distribution
180+
.as_ref()
181+
.is_none_or(|e| matches!(e, Exchange::Broadcast));
182182
let is_supported_type = Self::is_type_supported_for_bloom_filter(&data_type);
183183
let enable_bloom_runtime_filter_based_on_stats = Self::adjust_bloom_runtime_filter(
184184
ctx.clone(),
@@ -190,15 +190,14 @@ impl JoinRuntimeFilter {
190190
enable_in_cluster && is_supported_type && enable_bloom_runtime_filter_based_on_stats
191191
};
192192

193-
let enable_min_max_runtime_filter = build_side.as_exchange().is_none_or(|e| {
194-
matches!(e.kind, FragmentKind::Expansive) || matches!(e.kind, FragmentKind::Normal)
195-
}) && Self::is_type_supported_for_min_max_filter(
196-
&data_type,
197-
);
193+
let enable_min_max_runtime_filter = build_side_data_distribution
194+
.as_ref()
195+
.is_none_or(|e| matches!(e, Exchange::Broadcast | Exchange::Hash(_)))
196+
&& Self::is_type_supported_for_min_max_filter(&data_type);
198197

199-
let enable_inlist_runtime_filter = build_side.as_exchange().is_none_or(|e| {
200-
matches!(e.kind, FragmentKind::Expansive) || matches!(e.kind, FragmentKind::Normal)
201-
});
198+
let enable_inlist_runtime_filter = build_side_data_distribution
199+
.as_ref()
200+
.is_none_or(|e| matches!(e, Exchange::Broadcast | Exchange::Hash(_)));
202201

203202
// Create and add the runtime filter
204203
let runtime_filter = PhysicalRuntimeFilter {

src/query/sql/src/planner/optimizer/ir/expr/s_expr.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,16 @@ impl SExpr {
145145
&self.children[1]
146146
}
147147

148+
pub fn build_side_child(&self) -> &SExpr {
149+
assert_eq!(self.plan.rel_op(), crate::plans::RelOp::Join);
150+
&self.children[1]
151+
}
152+
153+
pub fn probe_side_child(&self) -> &SExpr {
154+
assert_eq!(self.plan.rel_op(), crate::plans::RelOp::Join);
155+
&self.children[0]
156+
}
157+
148158
pub fn arity(&self) -> usize {
149159
self.children.len()
150160
}
@@ -467,4 +477,37 @@ impl SExpr {
467477
*self.rel_prop.lock().unwrap() = Some(rel_prop.clone());
468478
Ok(rel_prop)
469479
}
480+
481+
pub fn get_data_distribution(&self) -> Result<Option<Exchange>> {
482+
match self.plan.rel_op() {
483+
crate::plans::RelOp::Scan
484+
| crate::plans::RelOp::DummyTableScan
485+
| crate::plans::RelOp::ConstantTableScan
486+
| crate::plans::RelOp::ExpressionScan
487+
| crate::plans::RelOp::CacheScan
488+
| crate::plans::RelOp::RecursiveCteScan => Ok(None),
489+
490+
crate::plans::RelOp::Join => self.probe_side_child().get_data_distribution(),
491+
492+
crate::plans::RelOp::Exchange => {
493+
Ok(Some(self.plan.as_ref().clone().try_into().unwrap()))
494+
}
495+
496+
crate::plans::RelOp::EvalScalar
497+
| crate::plans::RelOp::Filter
498+
| crate::plans::RelOp::Aggregate
499+
| crate::plans::RelOp::Sort
500+
| crate::plans::RelOp::Limit
501+
| crate::plans::RelOp::UnionAll
502+
| crate::plans::RelOp::Window
503+
| crate::plans::RelOp::ProjectSet
504+
| crate::plans::RelOp::Udf
505+
| crate::plans::RelOp::Udaf
506+
| crate::plans::RelOp::AsyncFunction
507+
| crate::plans::RelOp::MergeInto
508+
| crate::plans::RelOp::CompactBlock
509+
| crate::plans::RelOp::MutationSource
510+
| crate::plans::RelOp::Pattern => self.child(0)?.get_data_distribution(),
511+
}
512+
}
470513
}

tests/sqllogictests/suites/mode/cluster/exchange.test

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ Exchange
181181
├── probe keys: [CAST(t1.number (#2) AS UInt64 NULL)]
182182
├── keys is null equal: [false]
183183
├── filters: []
184-
├── build join filters:
185184
├── estimated rows: 2.00
186185
├── Exchange(Build)
187186
│ ├── output columns: [sum(number) (#1), numbers.number (#0)]
@@ -267,7 +266,6 @@ Fragment 2:
267266
├── probe keys: [CAST(t1.number (#2) AS UInt64 NULL)]
268267
├── keys is null equal: [false]
269268
├── filters: []
270-
├── build join filters:
271269
├── estimated rows: 2.00
272270
├── ExchangeSource(Build)
273271
│ ├── output columns: [sum(number) (#1), numbers.number (#0)]
@@ -573,7 +571,6 @@ Exchange
573571
├── probe keys: []
574572
├── keys is null equal: []
575573
├── filters: []
576-
├── build join filters:
577574
├── estimated rows: 100000.00
578575
├── Exchange(Build)
579576
│ ├── output columns: [numbers.number (#2)]
@@ -594,7 +591,6 @@ Exchange
594591
├── probe keys: []
595592
├── keys is null equal: []
596593
├── filters: []
597-
├── build join filters:
598594
├── estimated rows: 10000.00
599595
├── Exchange(Build)
600596
│ ├── output columns: [numbers.number (#0)]
@@ -634,7 +630,7 @@ Exchange
634630
├── probe keys: [t1.number (#0)]
635631
├── keys is null equal: [false]
636632
├── filters: []
637-
├── build join filters:
633+
├── build join filters(distributed):
638634
│ └── filter id:0, build key:t2.number (#1), probe key:t1.number (#0), filter type:inlist,min_max
639635
├── estimated rows: 200.00
640636
├── Exchange(Build)

0 commit comments

Comments
 (0)