Skip to content

Commit ff9b1fc

Browse files
authored
refactor: remove MergeIntoSource operator (#15435)
refactor: remove MergeIntoSource operator
1 parent 9da324f commit ff9b1fc

File tree

12 files changed

+63
-168
lines changed

12 files changed

+63
-168
lines changed

src/query/service/src/interpreters/interpreter_merge_into.rs

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use databend_common_sql::executor::physical_plans::Exchange;
4040
use databend_common_sql::executor::physical_plans::FragmentKind;
4141
use databend_common_sql::executor::physical_plans::MergeInto;
4242
use databend_common_sql::executor::physical_plans::MergeIntoAppendNotMatched;
43-
use databend_common_sql::executor::physical_plans::MergeIntoSource;
4443
use databend_common_sql::executor::physical_plans::MutationKind;
4544
use databend_common_sql::executor::PhysicalPlan;
4645
use databend_common_sql::executor::PhysicalPlanBuilder;
@@ -69,7 +68,6 @@ use crate::stream::DataBlockStream;
6968

7069
// predicate_index should not be conflict with update expr's column_binding's index.
7170
pub const PREDICATE_COLUMN_INDEX: IndexType = MAX as usize;
72-
const DUMMY_COL_INDEX: usize = MAX as usize;
7371
pub struct MergeIntoInterpreter {
7472
ctx: Arc<QueryContext>,
7573
plan: MergePlan,
@@ -222,8 +220,7 @@ impl MergeIntoInterpreter {
222220
};
223221

224222
let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);
225-
// build source for MergeInto
226-
let join_input = builder.build(&input, *columns_set.clone()).await?;
223+
let mut join_input = builder.build(&input, *columns_set.clone()).await?;
227224

228225
// find row_id column index
229226
let join_output_schema = join_input.output_schema()?;
@@ -243,7 +240,7 @@ impl MergeIntoInterpreter {
243240
Some(row_id_idx) => row_id_idx,
244241
}
245242
} else {
246-
DUMMY_COL_INDEX
243+
DUMMY_COLUMN_INDEX
247244
};
248245

249246
let mut found_row_id = false;
@@ -258,15 +255,14 @@ impl MergeIntoInterpreter {
258255

259256
// we use `merge_into_split_idx` to specify a column from target table to spilt a block
260257
// from join into matched part and unmatched part.
261-
let mut merge_into_split_idx = DUMMY_COLUMN_INDEX;
258+
let mut merge_into_split_idx = None;
262259
if matches!(merge_type, MergeIntoType::FullOperation) {
263260
for (idx, data_field) in join_output_schema.fields().iter().enumerate() {
264261
if *data_field.name() == split_idx.to_string() {
265-
merge_into_split_idx = idx;
262+
merge_into_split_idx = Some(idx);
266263
break;
267264
}
268265
}
269-
assert!(merge_into_split_idx != DUMMY_COLUMN_INDEX);
270266
}
271267

272268
if *distributed && !*change_join_order {
@@ -289,33 +285,15 @@ impl MergeIntoInterpreter {
289285
let table_info = fuse_table.get_table_info().clone();
290286
let catalog_ = self.ctx.get_catalog(catalog).await?;
291287

292-
// merge_into_source is used to recv join's datablocks and split them into matched and not matched
293-
// datablocks.
294-
let merge_into_source = if !*distributed && extract_exchange {
295-
// if we doesn't support distributed merge into, we should give the exchange merge back.
296-
let rollback_join_input = PhysicalPlan::Exchange(Exchange {
288+
if !*distributed && extract_exchange {
289+
join_input = PhysicalPlan::Exchange(Exchange {
297290
plan_id: 0,
298291
input: Box::new(join_input),
299292
kind: FragmentKind::Merge,
300293
keys: vec![],
301294
allow_adjust_parallelism: true,
302295
ignore_exchange: false,
303296
});
304-
PhysicalPlan::MergeIntoSource(MergeIntoSource {
305-
input: Box::new(rollback_join_input),
306-
row_id_idx: row_id_idx as u32,
307-
merge_type: merge_type.clone(),
308-
merge_into_split_idx: merge_into_split_idx as u32,
309-
plan_id: u32::MAX,
310-
})
311-
} else {
312-
PhysicalPlan::MergeIntoSource(MergeIntoSource {
313-
input: Box::new(join_input),
314-
row_id_idx: row_id_idx as u32,
315-
merge_type: merge_type.clone(),
316-
merge_into_split_idx: merge_into_split_idx as u32,
317-
plan_id: u32::MAX,
318-
})
319297
};
320298

321299
// transform unmatched for insert
@@ -371,7 +349,7 @@ impl MergeIntoInterpreter {
371349
let update_list = if let Some(update_list) = &item.update {
372350
// we don't need real col_indices here, just give a
373351
// dummy index, that's ok.
374-
let col_indices = vec![DUMMY_COL_INDEX];
352+
let col_indices = vec![DUMMY_COLUMN_INDEX];
375353
let (database, table) = match target_alias {
376354
None => (Some(database.as_str()), table_name.clone()),
377355
Some(alias) => (None, alias.name.to_string().to_lowercase()),
@@ -437,10 +415,8 @@ impl MergeIntoInterpreter {
437415
.collect();
438416

439417
let commit_input = if !distributed {
440-
// recv datablocks from matched upstream and unmatched upstream
441-
// transform and append data
442418
PhysicalPlan::MergeInto(Box::new(MergeInto {
443-
input: Box::new(merge_into_source),
419+
input: Box::new(join_input.clone()),
444420
table_info: table_info.clone(),
445421
catalog_info: catalog_.info(),
446422
unmatched,
@@ -455,10 +431,11 @@ impl MergeIntoInterpreter {
455431
target_build_optimization,
456432
can_try_update_column_only: *can_try_update_column_only,
457433
plan_id: u32::MAX,
434+
merge_into_split_idx,
458435
}))
459436
} else {
460437
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
461-
input: Box::new(merge_into_source.clone()),
438+
input: Box::new(join_input.clone()),
462439
table_info: table_info.clone(),
463440
catalog_info: catalog_.info(),
464441
unmatched: unmatched.clone(),
@@ -483,6 +460,7 @@ impl MergeIntoInterpreter {
483460
target_build_optimization: false, // we don't support for distributed mode for now..
484461
can_try_update_column_only: *can_try_update_column_only,
485462
plan_id: u32::MAX,
463+
merge_into_split_idx,
486464
}));
487465
// if change_join_order = true, it means the target is build side,
488466
// in this way, we will do matched operation and not matched operation
@@ -504,7 +482,7 @@ impl MergeIntoInterpreter {
504482
table_info: table_info.clone(),
505483
catalog_info: catalog_.info(),
506484
unmatched: unmatched.clone(),
507-
input_schema: merge_into_source.output_schema()?,
485+
input_schema: join_input.output_schema()?,
508486
merge_type: merge_type.clone(),
509487
change_join_order: *change_join_order,
510488
segments,

src/query/service/src/pipelines/builders/builder_merge_into.rs

Lines changed: 19 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use databend_common_sql::evaluator::CompoundBlockOperator;
3838
use databend_common_sql::executor::physical_plans::MergeInto;
3939
use databend_common_sql::executor::physical_plans::MergeIntoAddRowNumber;
4040
use databend_common_sql::executor::physical_plans::MergeIntoAppendNotMatched;
41-
use databend_common_sql::executor::physical_plans::MergeIntoSource;
4241
use databend_common_sql::executor::physical_plans::MutationKind;
4342
use databend_common_storages_fuse::operations::MatchedSplitProcessor;
4443
use databend_common_storages_fuse::operations::MergeIntoNotMatchedProcessor;
@@ -329,43 +328,6 @@ impl PipelineBuilder {
329328
Ok(())
330329
}
331330

332-
// Optimization Todo(@JackTan25): If insert only, we can reduce the target columns after join.
333-
pub(crate) fn build_merge_into_source(
334-
&mut self,
335-
merge_into_source: &MergeIntoSource,
336-
) -> Result<()> {
337-
let MergeIntoSource {
338-
input,
339-
merge_type,
340-
merge_into_split_idx,
341-
..
342-
} = merge_into_source;
343-
344-
self.build_pipeline(input)?;
345-
self.main_pipeline
346-
.try_resize(self.ctx.get_settings().get_max_threads()? as usize)?;
347-
// 1. if matchedOnly, we will use inner join
348-
// 2. if insert Only, we will use right anti join
349-
// 3. other cases, we use right outer join
350-
// an optimization later: for unmatched only, we can reverse
351-
// `on conditions` and use inner join.
352-
// merge into's parallism depends on the join probe number.
353-
if let MergeIntoType::FullOperation = merge_type {
354-
let mut items = Vec::with_capacity(self.main_pipeline.output_len());
355-
let output_len = self.main_pipeline.output_len();
356-
for _ in 0..output_len {
357-
let merge_into_split_processor =
358-
MergeIntoSplitProcessor::create(*merge_into_split_idx, false)?;
359-
items.push(merge_into_split_processor.into_pipe_item());
360-
}
361-
362-
self.main_pipeline
363-
.add_pipe(Pipe::create(output_len, output_len * 2, items));
364-
}
365-
366-
Ok(())
367-
}
368-
369331
fn resize_row_id(&mut self, step: usize) -> Result<()> {
370332
// resize row_id
371333
let mut ranges = Vec::with_capacity(self.main_pipeline.output_len());
@@ -405,11 +367,29 @@ impl PipelineBuilder {
405367
merge_type,
406368
change_join_order,
407369
can_try_update_column_only,
370+
merge_into_split_idx,
408371
..
409372
} = merge_into;
410373

411374
self.build_pipeline(input)?;
412375

376+
self.main_pipeline
377+
.try_resize(self.ctx.get_settings().get_max_threads()? as usize)?;
378+
379+
// If `merge_into_split_idx` isn't None, it means the merge type is full operation.
380+
if let Some(split_idx) = merge_into_split_idx {
381+
let mut items = Vec::with_capacity(self.main_pipeline.output_len());
382+
let output_len = self.main_pipeline.output_len();
383+
for _ in 0..output_len {
384+
let merge_into_split_processor =
385+
MergeIntoSplitProcessor::create(*split_idx as u32)?;
386+
items.push(merge_into_split_processor.into_pipe_item());
387+
}
388+
389+
self.main_pipeline
390+
.add_pipe(Pipe::create(output_len, output_len * 2, items));
391+
}
392+
413393
let tbl = self
414394
.ctx
415395
.build_table_by_table_info(catalog_info, table_info, None)?;
@@ -443,7 +423,7 @@ impl PipelineBuilder {
443423
// | +---+--------------->| MatchedSplitProcessor |
444424
// | | | | +-+
445425
// +----------------------+ | +---+ +-----------------------------+-+
446-
// | MergeIntoSource +---------->|MergeIntoSplitProcessor|
426+
// | MergeInto +---------->|MergeIntoSplitProcessor|
447427
// +----------------------+ | +---+ +-----------------------------+
448428
// | | | NotMatched | +-+
449429
// | +---+--------------->| MergeIntoNotMatchedProcessor| |

src/query/service/src/pipelines/pipeline_builder.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,7 @@ impl PipelineBuilder {
9999
pub(crate) fn add_plan_scope(&mut self, plan: &PhysicalPlan) -> Result<Option<PlanScopeGuard>> {
100100
match plan {
101101
PhysicalPlan::EvalScalar(v) if v.exprs.is_empty() => Ok(None),
102-
PhysicalPlan::MergeIntoSource(v) if v.merge_type != MergeIntoType::FullOperation => {
103-
Ok(None)
104-
}
102+
PhysicalPlan::MergeInto(v) if v.merge_type != MergeIntoType::FullOperation => Ok(None),
105103
_ => {
106104
let desc = plan.get_desc()?;
107105
let plan_labels = plan.get_labels()?;
@@ -170,9 +168,6 @@ impl PipelineBuilder {
170168

171169
// Merge into.
172170
PhysicalPlan::MergeInto(merge_into) => self.build_merge_into(merge_into),
173-
PhysicalPlan::MergeIntoSource(merge_into_source) => {
174-
self.build_merge_into_source(merge_into_source)
175-
}
176171
PhysicalPlan::MergeIntoAppendNotMatched(merge_into_append_not_matched) => {
177172
self.build_merge_into_append_not_matched(merge_into_append_not_matched)
178173
}

src/query/sql/src/executor/format.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,6 @@ fn to_format_tree(
241241
}
242242
PhysicalPlan::ReplaceInto(_) => Ok(FormatTreeNode::new("Replace".to_string())),
243243
PhysicalPlan::MergeInto(_) => Ok(FormatTreeNode::new("MergeInto".to_string())),
244-
PhysicalPlan::MergeIntoSource(_) => Ok(FormatTreeNode::new("MergeIntoSource".to_string())),
245244
PhysicalPlan::MergeIntoAddRowNumber(_) => {
246245
Ok(FormatTreeNode::new("MergeIntoAddRowNumber".to_string()))
247246
}

src/query/sql/src/executor/physical_plan.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ use crate::executor::physical_plans::MaterializedCte;
5454
use crate::executor::physical_plans::MergeInto;
5555
use crate::executor::physical_plans::MergeIntoAddRowNumber;
5656
use crate::executor::physical_plans::MergeIntoAppendNotMatched;
57-
use crate::executor::physical_plans::MergeIntoSource;
5857
use crate::executor::physical_plans::Project;
5958
use crate::executor::physical_plans::ProjectSet;
6059
use crate::executor::physical_plans::RangeJoin;
@@ -116,7 +115,6 @@ pub enum PhysicalPlan {
116115
ReplaceInto(Box<ReplaceInto>),
117116

118117
/// MergeInto
119-
MergeIntoSource(MergeIntoSource),
120118
MergeInto(Box<MergeInto>),
121119
MergeIntoAppendNotMatched(Box<MergeIntoAppendNotMatched>),
122120
MergeIntoAddRowNumber(Box<MergeIntoAddRowNumber>),
@@ -303,11 +301,6 @@ impl PhysicalPlan {
303301
*next_id += 1;
304302
plan.input.adjust_plan_id(next_id);
305303
}
306-
PhysicalPlan::MergeIntoSource(plan) => {
307-
plan.plan_id = *next_id;
308-
*next_id += 1;
309-
plan.input.adjust_plan_id(next_id);
310-
}
311304
PhysicalPlan::MergeIntoAppendNotMatched(plan) => {
312305
plan.plan_id = *next_id;
313306
*next_id += 1;
@@ -422,7 +415,6 @@ impl PhysicalPlan {
422415
PhysicalPlan::DeleteSource(v) => v.plan_id,
423416
PhysicalPlan::MergeInto(v) => v.plan_id,
424417
PhysicalPlan::MergeIntoAddRowNumber(v) => v.plan_id,
425-
PhysicalPlan::MergeIntoSource(v) => v.plan_id,
426418
PhysicalPlan::MergeIntoAppendNotMatched(v) => v.plan_id,
427419
PhysicalPlan::CommitSink(v) => v.plan_id,
428420
PhysicalPlan::CopyIntoTable(v) => v.plan_id,
@@ -473,7 +465,6 @@ impl PhysicalPlan {
473465
PhysicalPlan::MaterializedCte(plan) => plan.output_schema(),
474466
PhysicalPlan::ConstantTableScan(plan) => plan.output_schema(),
475467
PhysicalPlan::Udf(plan) => plan.output_schema(),
476-
PhysicalPlan::MergeIntoSource(plan) => plan.input.output_schema(),
477468
PhysicalPlan::MergeInto(plan) => Ok(plan.output_schema.clone()),
478469
PhysicalPlan::MergeIntoAddRowNumber(plan) => plan.output_schema(),
479470
PhysicalPlan::ReplaceAsyncSourcer(_)
@@ -535,7 +526,6 @@ impl PhysicalPlan {
535526
PhysicalPlan::ReplaceDeduplicate(_) => "ReplaceDeduplicate".to_string(),
536527
PhysicalPlan::ReplaceInto(_) => "Replace".to_string(),
537528
PhysicalPlan::MergeInto(_) => "MergeInto".to_string(),
538-
PhysicalPlan::MergeIntoSource(_) => "MergeIntoSource".to_string(),
539529
PhysicalPlan::MergeIntoAppendNotMatched(_) => "MergeIntoAppendNotMatched".to_string(),
540530
PhysicalPlan::CteScan(_) => "PhysicalCteScan".to_string(),
541531
PhysicalPlan::MaterializedCte(_) => "PhysicalMaterializedCte".to_string(),
@@ -604,7 +594,6 @@ impl PhysicalPlan {
604594
PhysicalPlan::MergeIntoAddRowNumber(plan) => {
605595
Box::new(std::iter::once(plan.input.as_ref()))
606596
}
607-
PhysicalPlan::MergeIntoSource(plan) => Box::new(std::iter::once(plan.input.as_ref())),
608597
PhysicalPlan::MergeIntoAppendNotMatched(plan) => {
609598
Box::new(std::iter::once(plan.input.as_ref()))
610599
}
@@ -663,7 +652,6 @@ impl PhysicalPlan {
663652
| PhysicalPlan::MergeInto(_)
664653
| PhysicalPlan::MergeIntoAddRowNumber(_)
665654
| PhysicalPlan::MergeIntoAppendNotMatched(_)
666-
| PhysicalPlan::MergeIntoSource(_)
667655
| PhysicalPlan::ConstantTableScan(_)
668656
| PhysicalPlan::CteScan(_)
669657
| PhysicalPlan::ReclusterSource(_)

src/query/sql/src/executor/physical_plan_display.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use crate::executor::physical_plans::MaterializedCte;
4242
use crate::executor::physical_plans::MergeInto;
4343
use crate::executor::physical_plans::MergeIntoAddRowNumber;
4444
use crate::executor::physical_plans::MergeIntoAppendNotMatched;
45-
use crate::executor::physical_plans::MergeIntoSource;
4645
use crate::executor::physical_plans::Project;
4746
use crate::executor::physical_plans::ProjectSet;
4847
use crate::executor::physical_plans::RangeJoin;
@@ -105,7 +104,6 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> {
105104
PhysicalPlan::ReplaceAsyncSourcer(async_sourcer) => write!(f, "{}", async_sourcer)?,
106105
PhysicalPlan::ReplaceDeduplicate(deduplicate) => write!(f, "{}", deduplicate)?,
107106
PhysicalPlan::ReplaceInto(replace) => write!(f, "{}", replace)?,
108-
PhysicalPlan::MergeIntoSource(merge_into_source) => write!(f, "{}", merge_into_source)?,
109107
PhysicalPlan::MergeInto(merge_into) => write!(f, "{}", merge_into)?,
110108
PhysicalPlan::MergeIntoAppendNotMatched(merge_into_row_id_apply) => {
111109
write!(f, "{}", merge_into_row_id_apply)?
@@ -507,12 +505,6 @@ impl Display for MergeIntoAppendNotMatched {
507505
}
508506
}
509507

510-
impl Display for MergeIntoSource {
511-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
512-
write!(f, "MergeIntoSource")
513-
}
514-
}
515-
516508
impl Display for ReclusterSource {
517509
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
518510
write!(f, "ReclusterSource")

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use crate::executor::physical_plans::MaterializedCte;
4646
use crate::executor::physical_plans::MergeInto;
4747
use crate::executor::physical_plans::MergeIntoAddRowNumber;
4848
use crate::executor::physical_plans::MergeIntoAppendNotMatched;
49-
use crate::executor::physical_plans::MergeIntoSource;
5049
use crate::executor::physical_plans::Project;
5150
use crate::executor::physical_plans::ProjectSet;
5251
use crate::executor::physical_plans::RangeJoin;
@@ -97,7 +96,6 @@ pub trait PhysicalPlanReplacer {
9796
PhysicalPlan::ReplaceInto(plan) => self.replace_replace_into(plan),
9897
PhysicalPlan::MergeInto(plan) => self.replace_merge_into(plan),
9998
PhysicalPlan::MergeIntoAddRowNumber(plan) => self.replace_add_row_number(plan),
100-
PhysicalPlan::MergeIntoSource(plan) => self.replace_merge_into_source(plan),
10199
PhysicalPlan::MergeIntoAppendNotMatched(plan) => {
102100
self.replace_merge_into_row_id_apply(plan)
103101
}
@@ -486,14 +484,6 @@ pub trait PhysicalPlanReplacer {
486484
)))
487485
}
488486

489-
fn replace_merge_into_source(&mut self, plan: &MergeIntoSource) -> Result<PhysicalPlan> {
490-
let input = self.replace(&plan.input)?;
491-
Ok(PhysicalPlan::MergeIntoSource(MergeIntoSource {
492-
input: Box::new(input),
493-
..plan.clone()
494-
}))
495-
}
496-
497487
fn replace_merge_into_row_id_apply(
498488
&mut self,
499489
plan: &MergeIntoAppendNotMatched,
@@ -708,9 +698,6 @@ impl PhysicalPlan {
708698
PhysicalPlan::ReplaceInto(plan) => {
709699
Self::traverse(&plan.input, pre_visit, visit, post_visit);
710700
}
711-
PhysicalPlan::MergeIntoSource(plan) => {
712-
Self::traverse(&plan.input, pre_visit, visit, post_visit);
713-
}
714701
PhysicalPlan::MergeInto(plan) => {
715702
Self::traverse(&plan.input, pre_visit, visit, post_visit);
716703
}

0 commit comments

Comments
 (0)