Skip to content

Commit 28a6090

Browse files
authored
chore: add plan id for mutations (#14685)
* add plan id for mutations * add adjust_plan_id * refactor adjust_plan_id
1 parent 3340c38 commit 28a6090

20 files changed

+250
-151
lines changed

src/query/service/src/interpreters/interpreter_delete.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ impl DeleteInterpreter {
260260
col_indices,
261261
query_row_id_col,
262262
snapshot: snapshot.clone(),
263+
plan_id: u32::MAX,
263264
}));
264265

265266
if is_distributed {
@@ -272,8 +273,7 @@ impl DeleteInterpreter {
272273
ignore_exchange: false,
273274
});
274275
}
275-
276-
Ok(PhysicalPlan::CommitSink(Box::new(CommitSink {
276+
let mut plan = PhysicalPlan::CommitSink(Box::new(CommitSink {
277277
input: Box::new(root),
278278
snapshot,
279279
table_info,
@@ -283,7 +283,10 @@ impl DeleteInterpreter {
283283
merge_meta,
284284
need_lock: false,
285285
deduplicated_label: None,
286-
})))
286+
plan_id: u32::MAX,
287+
}));
288+
plan.adjust_plan_id(&mut 0);
289+
Ok(plan)
287290
}
288291
}
289292

src/query/service/src/interpreters/interpreter_merge_into.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,13 +309,15 @@ impl MergeIntoInterpreter {
309309
row_id_idx: row_id_idx as u32,
310310
merge_type: merge_type.clone(),
311311
merge_into_split_idx: merge_into_split_idx as u32,
312+
plan_id: u32::MAX,
312313
})
313314
} else {
314315
PhysicalPlan::MergeIntoSource(MergeIntoSource {
315316
input: Box::new(join_input),
316317
row_id_idx: row_id_idx as u32,
317318
merge_type: merge_type.clone(),
318319
merge_into_split_idx: merge_into_split_idx as u32,
320+
plan_id: u32::MAX,
319321
})
320322
};
321323

@@ -462,6 +464,7 @@ impl MergeIntoInterpreter {
462464
change_join_order: *change_join_order,
463465
target_build_optimization,
464466
can_try_update_column_only: *can_try_update_column_only,
467+
plan_id: u32::MAX,
465468
}))
466469
} else {
467470
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
@@ -489,6 +492,7 @@ impl MergeIntoInterpreter {
489492
change_join_order: *change_join_order,
490493
target_build_optimization: false, // we don't support for distributed mode for now..
491494
can_try_update_column_only: *can_try_update_column_only,
495+
plan_id: u32::MAX,
492496
}));
493497
// if change_join_order = true, it means the target is build side,
494498
// in this way, we will do matched operation and not matched operation
@@ -514,11 +518,12 @@ impl MergeIntoInterpreter {
514518
merge_type: merge_type.clone(),
515519
change_join_order: *change_join_order,
516520
segments,
521+
plan_id: u32::MAX,
517522
}))
518523
};
519524

520525
// build mutation_aggregate
521-
let physical_plan = PhysicalPlan::CommitSink(Box::new(CommitSink {
526+
let mut physical_plan = PhysicalPlan::CommitSink(Box::new(CommitSink {
522527
input: Box::new(commit_input),
523528
snapshot: base_snapshot,
524529
table_info: table_info.clone(),
@@ -529,8 +534,9 @@ impl MergeIntoInterpreter {
529534
merge_meta: false,
530535
need_lock: false,
531536
deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? },
537+
plan_id: u32::MAX,
532538
}));
533-
539+
physical_plan.adjust_plan_id(&mut 0);
534540
Ok((physical_plan, table_info))
535541
}
536542

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ impl ReplaceInterpreter {
295295
table_level_range_index,
296296
need_insert: true,
297297
delete_when,
298+
plan_id: u32::MAX,
298299
},
299300
)));
300301
root = Box::new(PhysicalPlan::ReplaceInto(Box::new(ReplaceInto {
@@ -312,6 +313,7 @@ impl ReplaceInterpreter {
312313
.collect(),
313314
block_slots: None,
314315
need_insert: true,
316+
plan_id: u32::MAX,
315317
})));
316318
if is_distributed {
317319
root = Box::new(PhysicalPlan::Exchange(Exchange {
@@ -333,7 +335,9 @@ impl ReplaceInterpreter {
333335
merge_meta: false,
334336
need_lock: false,
335337
deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? },
338+
plan_id: u32::MAX,
336339
})));
340+
root.adjust_plan_id(&mut 0);
337341
Ok((root, purge_info))
338342
}
339343

@@ -401,6 +405,7 @@ impl ReplaceInterpreter {
401405
value_data: value_data.to_string(),
402406
start: span_offset,
403407
schema,
408+
plan_id: u32::MAX,
404409
},
405410
)))
406411
}

src/query/service/src/interpreters/interpreter_table_optimize.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ impl OptimizeTableInterpreter {
116116
table_info: table_info.clone(),
117117
catalog_info: catalog_info.clone(),
118118
column_ids: snapshot.schema.to_leaf_column_id_set(),
119+
plan_id: u32::MAX,
119120
}));
120121

121122
if is_distributed {
@@ -139,6 +140,7 @@ impl OptimizeTableInterpreter {
139140
merge_meta,
140141
need_lock,
141142
deduplicated_label: None,
143+
plan_id: u32::MAX,
142144
})))
143145
}
144146

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ pub fn build_recluster_physical_plan(
233233
tasks,
234234
table_info: table_info.clone(),
235235
catalog_info: catalog_info.clone(),
236+
plan_id: u32::MAX,
236237
}));
237238

238239
if is_distributed {
@@ -245,14 +246,16 @@ pub fn build_recluster_physical_plan(
245246
ignore_exchange: false,
246247
});
247248
}
248-
249-
Ok(PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
249+
let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
250250
input: Box::new(root),
251251
table_info,
252252
catalog_info,
253253
snapshot,
254254
remained_blocks,
255255
removed_segment_indexes,
256256
removed_segment_summary,
257-
})))
257+
plan_id: u32::MAX,
258+
}));
259+
plan.adjust_plan_id(&mut 0);
260+
Ok(plan)
258261
}

src/query/service/src/interpreters/interpreter_update.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ impl UpdateInterpreter {
303303
query_row_id_col,
304304
update_list,
305305
computed_list,
306+
plan_id: u32::MAX,
306307
}));
307308

308309
if is_distributed {
@@ -315,8 +316,7 @@ impl UpdateInterpreter {
315316
ignore_exchange: false,
316317
});
317318
}
318-
319-
Ok(PhysicalPlan::CommitSink(Box::new(CommitSink {
319+
let mut plan = PhysicalPlan::CommitSink(Box::new(CommitSink {
320320
input: Box::new(root),
321321
snapshot,
322322
table_info,
@@ -326,6 +326,9 @@ impl UpdateInterpreter {
326326
merge_meta,
327327
need_lock: false,
328328
deduplicated_label: unsafe { ctx.get_settings().get_deduplicate_label()? },
329-
})))
329+
plan_id: u32::MAX,
330+
}));
331+
plan.adjust_plan_id(&mut 0);
332+
Ok(plan)
330333
}
331334
}

src/query/service/src/pipelines/builders/builder_replace_into.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ impl PipelineBuilder {
100100
segments,
101101
block_slots,
102102
need_insert,
103+
..
103104
} = replace;
104105
let max_threads = self.settings.get_max_threads()?;
105106
let segment_partition_num = std::cmp::min(segments.len(), max_threads as usize);
@@ -262,6 +263,7 @@ impl PipelineBuilder {
262263
target_schema,
263264
need_insert,
264265
delete_when,
266+
..
265267
} = deduplicate;
266268

267269
let tbl = self

0 commit comments

Comments
 (0)