Skip to content

Commit a78859b

Browse files
authored
Merge branch 'main' into chore/result
2 parents e30c4bf + 1c46ff2 commit a78859b

File tree

59 files changed

+222
-205
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+222
-205
lines changed

src/query/service/src/interpreters/access/accessor.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ use crate::interpreters::ManagementModeAccess;
2222
use crate::sessions::QueryContext;
2323
use crate::sql::plans::Plan;
2424

25+
#[async_trait::async_trait]
2526
pub trait AccessChecker: Sync + Send {
2627
// Check the access permission for the old plan.
2728
// TODO(bohu): Remove after new plan done.
28-
fn check(&self, plan: &PlanNode) -> Result<()>;
29+
async fn check(&self, plan: &PlanNode) -> Result<()>;
2930

3031
// Check the access permission for the old plan.
31-
fn check_new(&self, _plan: &Plan) -> Result<()>;
32+
async fn check_new(&self, _plan: &Plan) -> Result<()>;
3233
}
3334

3435
pub struct Accessor {
@@ -42,16 +43,16 @@ impl Accessor {
4243
Accessor { accessors }
4344
}
4445

45-
pub fn check(&self, plan: &PlanNode) -> Result<()> {
46+
pub async fn check(&self, plan: &PlanNode) -> Result<()> {
4647
for accessor in self.accessors.values() {
47-
accessor.check(plan)?;
48+
accessor.check(plan).await?;
4849
}
4950
Ok(())
5051
}
5152

52-
pub fn check_new(&self, plan: &Plan) -> Result<()> {
53+
pub async fn check_new(&self, plan: &Plan) -> Result<()> {
5354
for accessor in self.accessors.values() {
54-
accessor.check_new(plan)?;
55+
accessor.check_new(plan).await?;
5556
}
5657
Ok(())
5758
}

src/query/service/src/interpreters/access/management_mode_access.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ impl ManagementModeAccess {
3333
}
3434
}
3535

36+
#[async_trait::async_trait]
3637
impl AccessChecker for ManagementModeAccess {
3738
// Check what we can do if in management mode.
38-
fn check(&self, plan: &PlanNode) -> Result<()> {
39+
async fn check(&self, plan: &PlanNode) -> Result<()> {
3940
// Allows for management-mode.
4041
if self.ctx.get_config().query.management_mode {
4142
return match plan {
@@ -50,7 +51,7 @@ impl AccessChecker for ManagementModeAccess {
5051
}
5152

5253
// Check what we can do if in management mode.
53-
fn check_new(&self, plan: &Plan) -> Result<()> {
54+
async fn check_new(&self, plan: &Plan) -> Result<()> {
5455
// Allows for management-mode.
5556
if self.ctx.get_config().query.management_mode {
5657
let ok = match plan {

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ pub struct InterpreterFactory;
3636
/// InterpreterFactory provides `get` method which transforms the PlanNode into the corresponding interpreter.
3737
/// Such as: SelectPlan -> SelectInterpreter, ExplainPlan -> ExplainInterpreter, ...
3838
impl InterpreterFactory {
39-
pub fn get(ctx: Arc<QueryContext>, plan: PlanNode) -> Result<Arc<dyn Interpreter>> {
39+
pub async fn get(ctx: Arc<QueryContext>, plan: PlanNode) -> Result<Arc<dyn Interpreter>> {
4040
// Check the access permission.
4141
let access_checker = Accessor::create(ctx.clone());
4242
access_checker
4343
.check(&plan)
44+
.await
4445
.map(|e| error!("Access.denied(v1): {:?}", e))?;
4546

4647
let inner = Self::create_interpreter(ctx.clone(), &plan)?;

src/query/service/src/interpreters/interpreter_factory_v2.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ impl InterpreterFactoryV2 {
4343
matches!(stmt, DfStatement::SeeYouAgain)
4444
}
4545

46-
pub fn get(ctx: Arc<QueryContext>, plan: &Plan) -> Result<InterpreterPtr> {
46+
pub async fn get(ctx: Arc<QueryContext>, plan: &Plan) -> Result<InterpreterPtr> {
4747
// Check the access permission.
4848
let access_checker = Accessor::create(ctx.clone());
4949
access_checker
5050
.check_new(plan)
51+
.await
5152
.map(|e| error!("Access.denied(v2): {:?}", e))?;
5253

5354
let inner = InterpreterFactoryV2::create_interpreter(ctx.clone(), plan)?;

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,9 @@ pub async fn clickhouse_handler_get(
221221
let format = get_format_from_plan(&plan, format)?;
222222

223223
context.attach_query_str(&sql);
224-
let interpreter = InterpreterFactoryV2::get(context.clone(), &plan).map_err(BadRequest)?;
224+
let interpreter = InterpreterFactoryV2::get(context.clone(), &plan)
225+
.await
226+
.map_err(BadRequest)?;
225227
execute(context, interpreter, plan.schema(), format, None, params)
226228
.await
227229
.map_err(InternalServerError)
@@ -233,7 +235,9 @@ pub async fn clickhouse_handler_get(
233235
context.attach_query_str(&sql);
234236
let format = get_format_with_default(format, default_format)?;
235237
let schema = plan.schema();
236-
let interpreter = InterpreterFactory::get(context.clone(), plan).map_err(BadRequest)?;
238+
let interpreter = InterpreterFactory::get(context.clone(), plan)
239+
.await
240+
.map_err(BadRequest)?;
237241
execute(context, interpreter, schema, format, None, params)
238242
.await
239243
.map_err(InternalServerError)
@@ -293,7 +297,9 @@ pub async fn clickhouse_handler_post(
293297
let format = get_format_with_default(fmt, default_format)?;
294298
let format = get_format_from_plan(&plan, format)?;
295299
ctx.attach_query_str(&sql);
296-
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan).map_err(BadRequest)?;
300+
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan)
301+
.await
302+
.map_err(BadRequest)?;
297303

298304
execute(ctx, interpreter, plan.schema(), format, None, params)
299305
.await
@@ -308,7 +314,9 @@ pub async fn clickhouse_handler_post(
308314
let format = get_format_with_default(format, default_format)?;
309315

310316
let schema = plan.schema();
311-
let interpreter = InterpreterFactory::get(ctx.clone(), plan).map_err(BadRequest)?;
317+
let interpreter = InterpreterFactory::get(ctx.clone(), plan)
318+
.await
319+
.map_err(BadRequest)?;
312320
execute(ctx, interpreter, schema, format, None, params)
313321
.await
314322
.map_err(InternalServerError)

src/query/service/src/servers/http/v1/load.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ fn execute_query(
7272
source_builder: SourcePipeBuilder,
7373
) -> impl Future<Output = Result<()>> {
7474
async move {
75-
let interpreter = InterpreterFactory::get(context.clone(), node)?;
75+
let interpreter = InterpreterFactory::get(context.clone(), node).await?;
7676

7777
if let Err(cause) = interpreter.start().await {
7878
error!("interpreter.start error: {:?}", cause);
@@ -202,8 +202,9 @@ pub async fn streaming_load(
202202
StatusCode::BAD_REQUEST,
203203
)),
204204
}?;
205-
let interpreter =
206-
InterpreterFactory::get(context.clone(), plan.clone()).map_err(InternalServerError)?;
205+
let interpreter = InterpreterFactory::get(context.clone(), plan.clone())
206+
.await
207+
.map_err(InternalServerError)?;
207208
let _ = interpreter
208209
.set_source_pipe_builder(Some(source_pipe_builder))
209210
.map_err(|e| error!("interpreter.set_source_pipe_builder.error: {:?}", e));

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl ExecuteState {
245245
let mut planner = Planner::new(ctx.clone());
246246
let (plan, _, _) = planner.plan_sql(sql).await?;
247247
is_select = matches!(&plan, Plan::Query { .. });
248-
InterpreterFactoryV2::get(ctx.clone(), &plan)
248+
InterpreterFactoryV2::get(ctx.clone(), &plan).await
249249
} else {
250250
let plan = match PlanParser::parse(ctx.clone(), sql).await {
251251
Ok(p) => p,
@@ -256,7 +256,7 @@ impl ExecuteState {
256256
};
257257

258258
is_select = matches!(&plan, PlanNode::Select(_));
259-
InterpreterFactory::get(ctx.clone(), plan)
259+
InterpreterFactory::get(ctx.clone(), plan).await
260260
}?;
261261

262262
if is_v2 && is_select {

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -344,19 +344,23 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
344344
Ok((_, h)) => h.clone(),
345345
Err(_) => vec![],
346346
};
347-
let mut has_result_set = true;
348-
let interpreter = if settings.get_enable_planner_v2()? != 0 {
347+
348+
let (interpreter, has_result_set) = if settings.get_enable_planner_v2()? != 0 {
349349
let mut planner = Planner::new(context.clone());
350-
planner.plan_sql(query).await.and_then(|v| {
351-
has_result_set = has_result_set_by_plan(&v.0);
352-
InterpreterFactoryV2::get(context.clone(), &v.0)
353-
})
350+
let plan = planner.plan_sql(query).await?;
351+
let has_result_set = has_result_set_by_plan(&plan.0);
352+
(
353+
InterpreterFactoryV2::get(context.clone(), &plan.0).await,
354+
has_result_set,
355+
)
354356
} else {
355-
let (plan, _) = PlanParser::parse_with_hint(query, context.clone()).await;
356-
plan.and_then(|v| {
357-
has_result_set = has_result_set_by_plan_node(&v);
358-
InterpreterFactory::get(context.clone(), v)
359-
})
357+
let (plan_res, _) = PlanParser::parse_with_hint(query, context.clone()).await;
358+
let plan = plan_res?;
359+
let has_result_set = has_result_set_by_plan_node(&plan);
360+
(
361+
InterpreterFactory::get(context.clone(), plan).await,
362+
has_result_set,
363+
)
360364
};
361365

362366
let hint = hints

src/query/service/tests/it/api/http/status.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async fn run_query(query_ctx: &Arc<QueryContext>) -> Result<Arc<dyn Interpreter>
6262
.await?;
6363
query_ctx.set_current_user(user);
6464
let plan = PlanParser::parse(query_ctx.clone(), sql).await?;
65-
InterpreterFactory::get(query_ctx.clone(), plan)
65+
InterpreterFactory::get(query_ctx.clone(), plan).await
6666
}
6767

6868
#[tokio::test]

src/query/service/tests/it/interpreters/access/management_mode_access.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,15 @@ async fn test_management_mode_access() -> Result<()> {
159159
for test in group.tests {
160160
let (plan, _, _) = planner.plan_sql(test.query).await?;
161161
if test.is_err {
162-
let res = InterpreterFactoryV2::get(ctx.clone(), &plan);
162+
let res = InterpreterFactoryV2::get(ctx.clone(), &plan).await;
163163
assert_eq!(
164164
test.is_err,
165165
res.is_err(),
166166
"in test case:{:?}",
167167
(group.name, test.name)
168168
);
169169
} else {
170-
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
170+
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
171171
interpreter.execute(ctx.clone()).await?;
172172
}
173173
}

0 commit comments

Comments
 (0)