Skip to content

Commit 87713d1

Browse files
authored
chore(planner): make optimize function async (#15121)
chore: make optimize async
1 parent 982c219 commit 87713d1

File tree

8 files changed

+18
-32
lines changed

8 files changed

+18
-32
lines changed

src/query/service/src/interpreters/interpreter_delete.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ pub async fn subquery_filter(
386386
.with_enable_join_reorder(unsafe { !ctx.get_settings().get_disable_join_reorder()? })
387387
.with_enable_dphyp(ctx.get_settings().get_enable_dphyp()?);
388388

389-
s_expr = optimize_query(opt_ctx, s_expr.clone())?;
389+
s_expr = optimize_query(opt_ctx, s_expr.clone()).await?;
390390

391391
// Create `input_expr` pipeline and execute it to get `_row_id` data block.
392392
let select_interpreter = SelectInterpreter::try_create(

src/query/sql/src/planner/binder/ddl/index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ impl Binder {
319319
let plan = if let Statement::Query(_) = &stmt {
320320
let select_plan = self.bind_statement(bind_context, &stmt).await?;
321321
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone());
322-
Ok(optimize(opt_ctx, select_plan)?)
322+
Ok(optimize(opt_ctx, select_plan).await?)
323323
} else {
324324
Err(ErrorCode::UnsupportedIndex("statement is not query"))
325325
};

src/query/sql/src/planner/binder/ddl/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ impl Binder {
665665
let select_plan = self.bind_statement(&mut bind_context, &stmt).await?;
666666
// Don't enable distributed optimization for `CREATE TABLE ... AS SELECT ...` for now
667667
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone());
668-
let optimized_plan = optimize(opt_ctx, select_plan)?;
668+
let optimized_plan = optimize(opt_ctx, select_plan).await?;
669669
Some(Box::new(optimized_plan))
670670
} else {
671671
None

src/query/sql/src/planner/binder/insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ impl Binder {
181181
}
182182
}
183183

184-
let optimized_plan = optimize(opt_ctx, select_plan)?;
184+
let optimized_plan = optimize(opt_ctx, select_plan).await?;
185185
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))
186186
}
187187
};

src/query/sql/src/planner/binder/replace.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ impl Binder {
166166
}
167167
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
168168
.with_enable_distributed_optimization(false);
169-
let optimized_plan = optimize(opt_ctx, select_plan)?;
169+
let optimized_plan = optimize(opt_ctx, select_plan).await?;
170170
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))
171171
}
172172
};

src/query/sql/src/planner/dataframe.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,8 @@ use databend_common_exception::Result;
3535
use databend_common_expression::DataSchemaRef;
3636
use parking_lot::RwLock;
3737

38-
use crate::optimizer::optimize;
39-
use crate::optimizer::OptimizerContext;
4038
use crate::planner::optimizer::s_expr::SExpr;
4139
use crate::plans::Limit;
42-
use crate::plans::Plan;
4340
use crate::BindContext;
4441
use crate::Binder;
4542
use crate::Metadata;
@@ -548,20 +545,6 @@ impl Dataframe {
548545
pub fn get_expr(&self) -> &SExpr {
549546
&self.s_expr
550547
}
551-
552-
pub fn into_plan(self, enable_distributed_optimization: bool) -> Result<Plan> {
553-
let plan = Plan::Query {
554-
s_expr: Box::new(self.s_expr),
555-
metadata: self.binder.metadata.clone(),
556-
bind_context: Box::new(self.bind_context),
557-
rewrite_kind: None,
558-
ignore_result: false,
559-
formatted_ast: None,
560-
};
561-
let opt_ctx = OptimizerContext::new(self.query_ctx.clone(), self.binder.metadata.clone())
562-
.with_enable_distributed_optimization(enable_distributed_optimization);
563-
optimize(opt_ctx, plan)
564-
}
565548
}
566549

567550
fn parse_cols(schema: DataSchemaRef, columns: &[&str]) -> Result<Vec<SelectTarget>> {

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ impl<'a> RecursiveOptimizer<'a> {
148148
}
149149

150150
#[minitrace::trace]
151-
pub fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
151+
#[async_backtrace::framed]
152+
pub async fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
152153
match plan {
153154
Plan::Query {
154155
s_expr,
@@ -158,7 +159,7 @@ pub fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
158159
formatted_ast,
159160
ignore_result,
160161
} => Ok(Plan::Query {
161-
s_expr: Box::new(optimize_query(opt_ctx, *s_expr)?),
162+
s_expr: Box::new(optimize_query(opt_ctx, *s_expr).await?),
162163
bind_context,
163164
metadata,
164165
rewrite_kind,
@@ -185,7 +186,7 @@ pub fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
185186
}
186187
_ => {
187188
if config.optimized || !config.logical {
188-
let optimized_plan = optimize(opt_ctx.clone(), *plan)?;
189+
let optimized_plan = Box::pin(optimize(opt_ctx.clone(), *plan)).await?;
189190
Ok(Plan::Explain {
190191
kind,
191192
config,
@@ -197,13 +198,13 @@ pub fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
197198
}
198199
},
199200
Plan::ExplainAnalyze { plan } => Ok(Plan::ExplainAnalyze {
200-
plan: Box::new(optimize(opt_ctx, *plan)?),
201+
plan: Box::new(Box::pin(optimize(opt_ctx, *plan)).await?),
201202
}),
202203
Plan::CopyIntoLocation(CopyIntoLocationPlan { stage, path, from }) => {
203204
Ok(Plan::CopyIntoLocation(CopyIntoLocationPlan {
204205
stage,
205206
path,
206-
from: Box::new(optimize(opt_ctx, *from)?),
207+
from: Box::new(Box::pin(optimize(opt_ctx, *from)).await?),
207208
}))
208209
}
209210
Plan::CopyIntoTable(mut plan) if !plan.no_file_to_copy => {
@@ -218,14 +219,15 @@ pub fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan> {
218219
);
219220
Ok(Plan::CopyIntoTable(plan))
220221
}
221-
Plan::MergeInto(plan) => optimize_merge_into(opt_ctx.clone(), plan),
222+
Plan::MergeInto(plan) => optimize_merge_into(opt_ctx.clone(), plan).await,
222223

223224
// Pass through statements.
224225
_ => Ok(plan),
225226
}
226227
}
227228

228-
pub fn optimize_query(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Result<SExpr> {
229+
#[async_backtrace::framed]
230+
pub async fn optimize_query(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Result<SExpr> {
229231
let enable_distributed_query = opt_ctx.enable_distributed_optimization
230232
&& !contains_local_table_scan(&s_expr, &opt_ctx.metadata);
231233

@@ -345,12 +347,13 @@ fn get_optimized_memo(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Result<Me
345347
Ok(cascades.memo)
346348
}
347349

348-
fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) -> Result<Plan> {
350+
#[async_backtrace::framed]
351+
async fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) -> Result<Plan> {
349352
// optimize source :fix issue #13733
350353
// reason: if there is subquery,windowfunc exprs etc. see
351354
// src/planner/semantic/lowering.rs `as_raw_expr()`, we will
352355
// get dummy index. So we need to use optimizer to solve this.
353-
let mut right_source = optimize_query(opt_ctx.clone(), plan.input.child(1)?.clone())?;
356+
let mut right_source = optimize_query(opt_ctx.clone(), plan.input.child(1)?.clone()).await?;
354357

355358
// if it's not distributed execution, we should reserve
356359
// exchange to merge source data.

src/query/sql/src/planner/planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ impl Planner {
164164
})
165165
.with_enable_dphyp(self.ctx.get_settings().get_enable_dphyp()?);
166166

167-
let optimized_plan = optimize(opt_ctx, plan)?;
167+
let optimized_plan = optimize(opt_ctx, plan).await?;
168168
Ok((optimized_plan, PlanExtras {
169169
metadata,
170170
format,

0 commit comments

Comments
 (0)