Skip to content

Commit 493af79

Browse files
authored
chore(planner): fix distributed merge into (#16228)
* chore(planner): fix distributed merge into * chore(code): make lint * chore(planner): add is_broadcast check for BroadcastToShuffleOptimizer * chore(test): fix sqllogictest * chore(test): add distributed merge into test * chore(test): change to distributed plan * chore(test): remove explain
1 parent da0dfaa commit 493af79

File tree

17 files changed

+340
-157
lines changed

17 files changed

+340
-157
lines changed

src/query/settings/src/settings_default.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,12 +296,6 @@ impl DefaultSettings {
296296
mode: SettingMode::Both,
297297
range: Some(SettingRange::Numeric(0..=u64::MAX)),
298298
}),
299-
("disable_merge_into_join_reorder", DefaultSettingValue {
300-
value: UserSettingValue::UInt64(0),
301-
desc: "Disable merge into join reorder optimization.",
302-
mode: SettingMode::Both,
303-
range: Some(SettingRange::Numeric(0..=1)),
304-
}),
305299
("enable_merge_into_row_fetch", DefaultSettingValue {
306300
value: UserSettingValue::UInt64(1),
307301
desc: "Enable merge into row fetch optimization.",
@@ -356,6 +350,12 @@ impl DefaultSettings {
356350
mode: SettingMode::Both,
357351
range: Some(SettingRange::Numeric(0..=1)),
358352
}),
353+
("enforce_shuffle_join", DefaultSettingValue {
354+
value: UserSettingValue::UInt64(0),
355+
desc: "Enforce shuffle join.",
356+
mode: SettingMode::Both,
357+
range: Some(SettingRange::Numeric(0..=1)),
358+
}),
359359
("storage_fetch_part_num", DefaultSettingValue {
360360
value: UserSettingValue::UInt64(2),
361361
desc: "Sets the number of partitions that are fetched in parallel from storage during query execution.",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ impl Settings {
302302
Ok(self.try_get_u64("enforce_broadcast_join")? != 0)
303303
}
304304

305-
pub fn get_disable_merge_into_join_reorder(&self) -> Result<bool> {
306-
Ok(self.try_get_u64("disable_merge_into_join_reorder")? != 0)
305+
pub fn get_enforce_shuffle_join(&self) -> Result<bool> {
306+
Ok(self.try_get_u64("enforce_shuffle_join")? != 0)
307307
}
308308

309309
pub fn get_enable_merge_into_row_fetch(&self) -> Result<bool> {

src/query/sql/src/executor/physical_plans/physical_mutation.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl PhysicalPlanBuilder {
115115
distributed,
116116
predicate_column_index,
117117
row_id_index,
118-
change_join_order,
118+
row_id_shuffle,
119119
can_try_update_column_only,
120120
..
121121
} = mutation;
@@ -230,9 +230,7 @@ impl PhysicalPlanBuilder {
230230
// For distributed merge, we shuffle data blocks by block_id (drived from row_id) to avoid
231231
// different nodes update the same physical block simultaneously, data blocks that are needed
232232
// to insert just keep in local node.
233-
let source_is_broadcast =
234-
matches!(strategy, MutationStrategy::MatchedOnly) && !change_join_order;
235-
if *distributed && !is_not_matched_only && !source_is_broadcast {
233+
if *distributed && *row_id_shuffle && !is_not_matched_only {
236234
plan = PhysicalPlan::Exchange(build_block_id_shuffle_exchange(
237235
plan,
238236
bind_context,

src/query/sql/src/planner/binder/bind_mutation/bind.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ impl Binder {
306306
field_index_map,
307307
strategy: mutation_strategy.clone(),
308308
distributed: false,
309-
change_join_order: false,
309+
row_id_shuffle: true,
310310
row_id_index: target_table_row_id_index,
311311
can_try_update_column_only: self.can_try_update_column_only(&matched_clauses),
312312
lock_guard,

src/query/sql/src/planner/binder/bind_mutation/merge.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ use crate::plans::Plan;
3030
use crate::BindContext;
3131

3232
// Merge into strategies:
33-
// 1. Insert only: RIGHT ANTI join.
34-
// 2. Matched and unmatched: RIGHT OUTER join.
35-
// 3. Matched only: INNER join.
33+
// 1. Insert only: target right-anti join source.
34+
// 2. Matched and unmatched: target right-outer join source.
35+
// 3. Matched only: target inner join source.
3636
impl Binder {
3737
#[allow(warnings)]
3838
#[async_backtrace::framed]

src/query/sql/src/planner/binder/bind_mutation/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ mod update;
2020

2121
pub use bind::MutationStrategy;
2222
pub use bind::MutationType;
23-
pub use mutation_expression::target_table_position;
23+
pub use mutation_expression::target_probe;

src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ use std::sync::Arc;
1818

1919
use databend_common_ast::ast::Expr;
2020
use databend_common_ast::ast::JoinCondition;
21-
use databend_common_ast::ast::JoinOperator::Inner;
22-
use databend_common_ast::ast::JoinOperator::RightAnti;
23-
use databend_common_ast::ast::JoinOperator::RightOuter;
21+
use databend_common_ast::ast::JoinOperator;
2422
use databend_common_ast::ast::TableReference;
2523
use databend_common_catalog::plan::InternalColumn;
2624
use databend_common_catalog::plan::InternalColumnType;
@@ -164,9 +162,9 @@ impl MutationExpression {
164162
target_s_expr,
165163
source_s_expr,
166164
match mutation_strategy {
167-
MutationStrategy::MatchedOnly => Inner,
168-
MutationStrategy::NotMatchedOnly => RightAnti,
169-
MutationStrategy::MixedMatched => RightOuter,
165+
MutationStrategy::MatchedOnly => JoinOperator::Inner,
166+
MutationStrategy::NotMatchedOnly => JoinOperator::RightAnti,
167+
MutationStrategy::MixedMatched => JoinOperator::RightOuter,
170168
MutationStrategy::Direct => unreachable!(),
171169
},
172170
JoinCondition::On(Box::new(match_expr.clone())),
@@ -568,10 +566,11 @@ pub struct MutationExpressionBindResult {
568566
pub direct_filter: Option<ScalarExpr>,
569567
}
570568

571-
pub fn target_table_position(s_expr: &SExpr, target_table_index: usize) -> Result<usize> {
569+
pub fn target_probe(s_expr: &SExpr, target_table_index: usize) -> Result<bool> {
572570
if !matches!(s_expr.plan(), RelOperator::Join(_)) {
573-
return Ok(0);
571+
return Ok(false);
574572
}
573+
575574
fn contains_target_table(s_expr: &SExpr, target_table_index: usize) -> bool {
576575
if let RelOperator::Scan(ref scan) = s_expr.plan() {
577576
scan.table_index == target_table_index
@@ -582,9 +581,5 @@ pub fn target_table_position(s_expr: &SExpr, target_table_index: usize) -> Resul
582581
}
583582
}
584583

585-
if contains_target_table(s_expr.child(0)?, target_table_index) {
586-
Ok(0)
587-
} else {
588-
Ok(1)
589-
}
584+
Ok(contains_target_table(s_expr.child(0)?, target_table_index))
590585
}

src/query/sql/src/planner/binder/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ mod window;
5757

5858
pub use aggregate::AggregateInfo;
5959
pub use bind_context::*;
60-
pub use bind_mutation::target_table_position;
60+
pub use bind_mutation::target_probe;
6161
pub use bind_mutation::MutationStrategy;
6262
pub use bind_mutation::MutationType;
6363
pub use bind_query::bind_values;

src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,29 @@ use databend_common_exception::Result;
1818

1919
use crate::optimizer::extract::Matcher;
2020
use crate::optimizer::SExpr;
21-
use crate::plans::Exchange::Hash;
21+
use crate::plans::Exchange;
2222
use crate::plans::Join;
2323
use crate::plans::RelOp;
2424
use crate::plans::RelOperator;
25-
pub struct MergeOptimizer {
26-
pub merge_matcher: Matcher,
25+
pub struct BroadcastToShuffleOptimizer {
26+
pub matcher: Matcher,
2727
}
2828

29-
impl MergeOptimizer {
29+
impl BroadcastToShuffleOptimizer {
3030
pub fn create() -> Self {
3131
Self {
32-
merge_matcher: Self::merge_matcher(),
32+
matcher: Self::matcher(),
3333
}
3434
}
3535

36+
pub fn is_broadcast(&self, s_expr: &SExpr) -> Result<bool> {
37+
let right = s_expr.child(1)?;
38+
Ok(matches!(
39+
right.plan(),
40+
RelOperator::Exchange(Exchange::Broadcast)
41+
))
42+
}
43+
3644
pub fn optimize(&self, s_expr: &SExpr) -> Result<SExpr> {
3745
let left_exchange_input = s_expr.child(0)?;
3846

@@ -51,11 +59,11 @@ impl MergeOptimizer {
5159

5260
let new_join_children = vec![
5361
Arc::new(SExpr::create_unary(
54-
Arc::new(RelOperator::Exchange(Hash(left_conditions))),
62+
Arc::new(RelOperator::Exchange(Exchange::Hash(left_conditions))),
5563
Arc::new(left_exchange_input.clone()),
5664
)),
5765
Arc::new(SExpr::create_unary(
58-
Arc::new(RelOperator::Exchange(Hash(right_conditions))),
66+
Arc::new(RelOperator::Exchange(Exchange::Hash(right_conditions))),
5967
Arc::new(right_exchange_input.clone()),
6068
)),
6169
];
@@ -64,7 +72,7 @@ impl MergeOptimizer {
6472
Ok(join_s_expr)
6573
}
6674

67-
fn merge_matcher() -> Matcher {
75+
fn matcher() -> Matcher {
6876
// Input:
6977
// Join
7078
// / \

src/query/sql/src/planner/optimizer/distributed/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ mod distributed_merge;
1818
mod sort_and_limit;
1919

2020
pub use distributed::optimize_distributed_query;
21-
pub use distributed_merge::MergeOptimizer;
21+
pub use distributed_merge::BroadcastToShuffleOptimizer;
2222
pub use sort_and_limit::SortAndLimitPushDownOptimizer;

0 commit comments

Comments
 (0)