Skip to content

Commit 057e341

Browse files
committed
Make InterpreterFactor.get to async
1 parent 8ae33f0 commit 057e341

15 files changed

+49
-38
lines changed

src/query/service/src/interpreters/access/accessor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::sql::plans::Plan;
2626
pub trait AccessChecker: Sync + Send {
2727
// Check the access permission for the old plan.
2828
// TODO(bohu): Remove after new plan done.
29-
fn check(&self, plan: &PlanNode) -> Result<()>;
29+
async fn check(&self, plan: &PlanNode) -> Result<()>;
3030

3131
// Check the access permission for the old plan.
3232
async fn check_new(&self, _plan: &Plan) -> Result<()>;
@@ -43,9 +43,9 @@ impl Accessor {
4343
Accessor { accessors }
4444
}
4545

46-
pub fn check(&self, plan: &PlanNode) -> Result<()> {
46+
pub async fn check(&self, plan: &PlanNode) -> Result<()> {
4747
for accessor in self.accessors.values() {
48-
accessor.check(plan)?;
48+
accessor.check(plan).await?;
4949
}
5050
Ok(())
5151
}

src/query/service/src/interpreters/access/management_mode_access.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl ManagementModeAccess {
3636
#[async_trait::async_trait]
3737
impl AccessChecker for ManagementModeAccess {
3838
// Check what we can do if in management mode.
39-
fn check(&self, plan: &PlanNode) -> Result<()> {
39+
async fn check(&self, plan: &PlanNode) -> Result<()> {
4040
// Allows for management-mode.
4141
if self.ctx.get_config().query.management_mode {
4242
return match plan {

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ pub struct InterpreterFactory;
3636
/// InterpreterFactory provides `get` method which transforms the PlanNode into the corresponding interpreter.
3737
/// Such as: SelectPlan -> SelectInterpreter, ExplainPlan -> ExplainInterpreter, ...
3838
impl InterpreterFactory {
39-
pub fn get(ctx: Arc<QueryContext>, plan: PlanNode) -> Result<Arc<dyn Interpreter>> {
39+
pub async fn get(ctx: Arc<QueryContext>, plan: PlanNode) -> Result<Arc<dyn Interpreter>> {
4040
// Check the access permission.
4141
let access_checker = Accessor::create(ctx.clone());
4242
access_checker
4343
.check(&plan)
44+
.await
4445
.map(|e| error!("Access.denied(v1): {:?}", e))?;
4546

4647
let inner = Self::create_interpreter(ctx.clone(), &plan)?;

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,9 @@ pub async fn clickhouse_handler_get(
235235
context.attach_query_str(&sql);
236236
let format = get_format_with_default(format, default_format)?;
237237
let schema = plan.schema();
238-
let interpreter = InterpreterFactory::get(context.clone(), plan).map_err(BadRequest)?;
238+
let interpreter = InterpreterFactory::get(context.clone(), plan)
239+
.await
240+
.map_err(BadRequest)?;
239241
execute(context, interpreter, schema, format, None, params)
240242
.await
241243
.map_err(InternalServerError)
@@ -312,7 +314,9 @@ pub async fn clickhouse_handler_post(
312314
let format = get_format_with_default(format, default_format)?;
313315

314316
let schema = plan.schema();
315-
let interpreter = InterpreterFactory::get(ctx.clone(), plan).map_err(BadRequest)?;
317+
let interpreter = InterpreterFactory::get(ctx.clone(), plan)
318+
.await
319+
.map_err(BadRequest)?;
316320
execute(ctx, interpreter, schema, format, None, params)
317321
.await
318322
.map_err(InternalServerError)

src/query/service/src/servers/http/v1/load.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ fn execute_query(
7272
source_builder: SourcePipeBuilder,
7373
) -> impl Future<Output = Result<()>> {
7474
async move {
75-
let interpreter = InterpreterFactory::get(context.clone(), node)?;
75+
let interpreter = InterpreterFactory::get(context.clone(), node).await?;
7676

7777
if let Err(cause) = interpreter.start().await {
7878
error!("interpreter.start error: {:?}", cause);
@@ -202,8 +202,9 @@ pub async fn streaming_load(
202202
StatusCode::BAD_REQUEST,
203203
)),
204204
}?;
205-
let interpreter =
206-
InterpreterFactory::get(context.clone(), plan.clone()).map_err(InternalServerError)?;
205+
let interpreter = InterpreterFactory::get(context.clone(), plan.clone())
206+
.await
207+
.map_err(InternalServerError)?;
207208
let _ = interpreter
208209
.set_source_pipe_builder(Some(source_pipe_builder))
209210
.map_err(|e| error!("interpreter.set_source_pipe_builder.error: {:?}", e));

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ impl ExecuteState {
256256
};
257257

258258
is_select = matches!(&plan, PlanNode::Select(_));
259-
InterpreterFactory::get(ctx.clone(), plan)
259+
InterpreterFactory::get(ctx.clone(), plan).await
260260
}?;
261261

262262
if is_v2 && is_select {

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -344,18 +344,23 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
344344
Ok((_, h)) => h.clone(),
345345
Err(_) => vec![],
346346
};
347-
let mut has_result_set = true;
348-
let interpreter = if settings.get_enable_planner_v2()? != 0 {
347+
348+
let (interpreter, has_result_set) = if settings.get_enable_planner_v2()? != 0 {
349349
let mut planner = Planner::new(context.clone());
350-
let plan_res = planner.plan_sql(query).await?;
351-
has_result_set = has_result_set_by_plan(&plan_res.0);
352-
InterpreterFactoryV2::get(context.clone(), &plan_res.0).await
350+
let plan = planner.plan_sql(query).await?;
351+
let has_result_set = has_result_set_by_plan(&plan.0);
352+
(
353+
InterpreterFactoryV2::get(context.clone(), &plan.0).await,
354+
has_result_set,
355+
)
353356
} else {
354-
let (plan, _) = PlanParser::parse_with_hint(query, context.clone()).await;
355-
plan.and_then(|v| {
356-
has_result_set = has_result_set_by_plan_node(&v);
357-
InterpreterFactory::get(context.clone(), v)
358-
})
357+
let (plan_res, _) = PlanParser::parse_with_hint(query, context.clone()).await;
358+
let plan = plan_res?;
359+
let has_result_set = has_result_set_by_plan_node(&plan);
360+
(
361+
InterpreterFactory::get(context.clone(), plan).await,
362+
has_result_set,
363+
)
359364
};
360365

361366
let hint = hints

src/query/service/tests/it/api/http/status.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async fn run_query(query_ctx: &Arc<QueryContext>) -> Result<Arc<dyn Interpreter>
6262
.await?;
6363
query_ctx.set_current_user(user);
6464
let plan = PlanParser::parse(query_ctx.clone(), sql).await?;
65-
InterpreterFactory::get(query_ctx.clone(), plan)
65+
InterpreterFactory::get(query_ctx.clone(), plan).await
6666
}
6767

6868
#[tokio::test]

src/query/service/tests/it/interpreters/async_insert_queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ async fn test_async_insert_queue() -> Result<()> {
134134
{
135135
let query = "select * from default.test";
136136
let plan = PlanParser::parse(ctx.clone(), query).await?;
137-
let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?;
137+
let executor = InterpreterFactory::get(ctx.clone(), plan.clone()).await?;
138138
let stream = executor.execute(ctx.clone()).await?;
139139
let result = stream.try_collect::<Vec<_>>().await?;
140140
let expected = vec![
@@ -390,7 +390,7 @@ async fn test_async_insert_queue_no_wait() -> Result<()> {
390390
{
391391
let query = "select * from default.test";
392392
let plan = PlanParser::parse(ctx.clone(), query).await?;
393-
let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?;
393+
let executor = InterpreterFactory::get(ctx.clone(), plan.clone()).await?;
394394
let stream = executor.execute(ctx.clone()).await?;
395395
let result = stream.try_collect::<Vec<_>>().await?;
396396
let expected = vec![

src/query/service/tests/it/interpreters/interpreter_explain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async fn test_explain_interpreter() -> Result<()> {
2929
";
3030

3131
let plan = PlanParser::parse(ctx.clone(), query).await?;
32-
let executor = InterpreterFactory::get(ctx.clone(), plan)?;
32+
let executor = InterpreterFactory::get(ctx.clone(), plan).await?;
3333
assert_eq!(executor.name(), "ExplainInterpreter");
3434

3535
let stream = executor.execute(ctx).await?;

0 commit comments

Comments
 (0)