Skip to content

Commit 41a175f

Browse files
committed
update
1 parent 53f4218 commit 41a175f

File tree

4 files changed

+52
-40
lines changed

4 files changed

+52
-40
lines changed

src/query/service/src/interpreters/interpreter_append.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,12 @@ impl Interpreter for AppendInterpreter {
7575
let append: Append = match &self.s_expr.plan() {
7676
RelOperator::Append(append) => append.clone(),
7777
RelOperator::Exchange(_) => self.s_expr.child(0).unwrap().plan().clone().try_into()?,
78-
_ => unreachable!(),
78+
plan => {
79+
return Err(ErrorCode::Internal(format!(
80+
"AppendInterpreter: unexpected plan type: {:?}",
81+
plan
82+
)));
83+
}
7984
};
8085
let (target_table, catalog, database, table) = {
8186
let metadata = self.metadata.read();

src/query/sql/src/planner/optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use hyper_dp::DPhpy;
4343
pub use m_expr::MExpr;
4444
pub use memo::Memo;
4545
pub use optimizer::optimize;
46+
pub use optimizer::optimize_append;
4647
pub use optimizer::optimize_query;
4748
pub use optimizer::OptimizerContext;
4849
pub use optimizer::RecursiveOptimizer;

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -295,44 +295,17 @@ pub async fn optimize(mut opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan>
295295
overwrite,
296296
forbid_occ_retry,
297297
} => {
298-
let support_distributed_insert = {
299-
let append: Append = s_expr.plan().clone().try_into()?;
300-
let metadata = metadata.read();
301-
metadata
302-
.table(append.table_index)
303-
.table()
304-
.support_distributed_insert()
305-
};
306-
let enable_distributed = opt_ctx.enable_distributed_optimization
307-
&& opt_ctx
308-
.table_ctx
309-
.get_settings()
310-
.get_enable_distributed_copy()?
311-
&& support_distributed_insert;
312-
info!(
313-
"after optimization enable_distributed_copy? : {}",
314-
enable_distributed
315-
);
316-
let mut optimized_source =
317-
optimize_query(&mut opt_ctx, s_expr.child(0)?.clone()).await?;
318-
let optimized = match enable_distributed {
319-
true => {
320-
if let RelOperator::Exchange(Exchange::Merge) = optimized_source.plan.as_ref() {
321-
optimized_source = optimized_source.child(0).unwrap().clone();
322-
}
323-
let copy_into = SExpr::create_unary(
324-
Arc::new(s_expr.plan().clone()),
325-
Arc::new(optimized_source),
326-
);
327-
let exchange = Arc::new(RelOperator::Exchange(Exchange::Merge));
328-
SExpr::create_unary(exchange, Arc::new(copy_into))
329-
}
330-
false => {
331-
SExpr::create_unary(Arc::new(s_expr.plan().clone()), Arc::new(optimized_source))
332-
}
333-
};
298+
let append: Append = s_expr.plan().clone().try_into()?;
299+
let source = s_expr.child(0)?.clone();
300+
let optimized_source = optimize_query(&mut opt_ctx, source).await?;
301+
let optimized_append = optimize_append(
302+
append,
303+
optimized_source,
304+
metadata.clone(),
305+
opt_ctx.table_ctx.as_ref(),
306+
)?;
334307
Ok(Plan::Append {
335-
s_expr: Box::new(optimized),
308+
s_expr: Box::new(optimized_append),
336309
metadata,
337310
stage_table_info,
338311
overwrite,
@@ -610,3 +583,34 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu
610583
metadata: opt_ctx.metadata.clone(),
611584
})
612585
}
586+
587+
pub fn optimize_append(
588+
append: Append,
589+
source: SExpr,
590+
metadata: MetadataRef,
591+
table_ctx: &dyn TableContext,
592+
) -> Result<SExpr> {
593+
let support_distributed_insert = {
594+
let metadata = metadata.read();
595+
metadata
596+
.table(append.table_index)
597+
.table()
598+
.support_distributed_insert()
599+
};
600+
let enable_distributed = table_ctx.get_settings().get_enable_distributed_copy()?
601+
&& support_distributed_insert
602+
&& matches!(source.plan(), RelOperator::Exchange(Exchange::Merge));
603+
info!(
604+
"after optimization enable_distributed_copy? : {}",
605+
enable_distributed
606+
);
607+
match enable_distributed {
608+
true => {
609+
let source = source.child(0).unwrap().clone();
610+
let copy_into = SExpr::create_unary(Arc::new(append.into()), Arc::new(source));
611+
let exchange = Arc::new(RelOperator::Exchange(Exchange::Merge));
612+
Ok(SExpr::create_unary(exchange, Arc::new(copy_into)))
613+
}
614+
false => Ok(SExpr::create_unary(Arc::new(append.into()), Arc::new(source))),
615+
}
616+
}

src/query/sql/src/planner/plans/append.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::executor::physical_plans::PhysicalAppend;
4444
use crate::executor::PhysicalPlan;
4545
use crate::executor::PhysicalPlanBuilder;
4646
use crate::optimizer::optimize;
47+
use crate::optimizer::optimize_append;
4748
use crate::optimizer::OptimizerContext;
4849
use crate::optimizer::SExpr;
4950
use crate::ColumnBinding;
@@ -111,9 +112,10 @@ pub async fn create_append_plan_from_subquery(
111112
append_type: AppendType::Insert,
112113
};
113114

114-
let s_expr = SExpr::create_unary(Arc::new(insert_plan.into()), Arc::new(source));
115+
let optimized_append = optimize_append(insert_plan, source, metadata.clone(), ctx.as_ref())?;
116+
115117
let plan = Plan::Append {
116-
s_expr: Box::new(s_expr),
118+
s_expr: Box::new(optimized_append),
117119
metadata: metadata.clone(),
118120
stage_table_info: None,
119121
overwrite,

0 commit comments

Comments
 (0)