Skip to content

Commit c355ab9

Browse files
committed
chore(query): change InterpreterFactoryV2.get to async
1 parent ae3855e commit c355ab9

File tree

52 files changed

+169
-163
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+169
-163
lines changed

src/query/service/src/interpreters/access/accessor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ use crate::interpreters::ManagementModeAccess;
2222
use crate::sessions::QueryContext;
2323
use crate::sql::plans::Plan;
2424

25+
#[async_trait::async_trait]
2526
pub trait AccessChecker: Sync + Send {
2627
// Check the access permission for the old plan.
2728
// TODO(bohu): Remove after new plan done.
2829
fn check(&self, plan: &PlanNode) -> Result<()>;
2930

3031
// Check the access permission for the old plan.
31-
fn check_new(&self, _plan: &Plan) -> Result<()>;
32+
async fn check_new(&self, _plan: &Plan) -> Result<()>;
3233
}
3334

3435
pub struct Accessor {
@@ -49,9 +50,9 @@ impl Accessor {
4950
Ok(())
5051
}
5152

52-
pub fn check_new(&self, plan: &Plan) -> Result<()> {
53+
pub async fn check_new(&self, plan: &Plan) -> Result<()> {
5354
for accessor in self.accessors.values() {
54-
accessor.check_new(plan)?;
55+
accessor.check_new(plan).await?;
5556
}
5657
Ok(())
5758
}

src/query/service/src/interpreters/interpreter_factory_v2.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ impl InterpreterFactoryV2 {
4343
matches!(stmt, DfStatement::SeeYouAgain)
4444
}
4545

46-
pub fn get(ctx: Arc<QueryContext>, plan: &Plan) -> Result<InterpreterPtr> {
46+
pub async fn get(ctx: Arc<QueryContext>, plan: &Plan) -> Result<InterpreterPtr> {
4747
// Check the access permission.
4848
let access_checker = Accessor::create(ctx.clone());
4949
access_checker
5050
.check_new(plan)
51+
.await
5152
.map(|e| error!("Access.denied(v2): {:?}", e))?;
5253

5354
let inner = InterpreterFactoryV2::create_interpreter(ctx.clone(), plan)?;

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,9 @@ pub async fn clickhouse_handler_get(
221221
let format = get_format_from_plan(&plan, format)?;
222222

223223
context.attach_query_str(&sql);
224-
let interpreter = InterpreterFactoryV2::get(context.clone(), &plan).map_err(BadRequest)?;
224+
let interpreter = InterpreterFactoryV2::get(context.clone(), &plan)
225+
.await
226+
.map_err(BadRequest)?;
225227
execute(context, interpreter, plan.schema(), format, None, params)
226228
.await
227229
.map_err(InternalServerError)
@@ -293,7 +295,9 @@ pub async fn clickhouse_handler_post(
293295
let format = get_format_with_default(fmt, default_format)?;
294296
let format = get_format_from_plan(&plan, format)?;
295297
ctx.attach_query_str(&sql);
296-
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan).map_err(BadRequest)?;
298+
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan)
299+
.await
300+
.map_err(BadRequest)?;
297301

298302
execute(ctx, interpreter, plan.schema(), format, None, params)
299303
.await

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl ExecuteState {
245245
let mut planner = Planner::new(ctx.clone());
246246
let (plan, _, _) = planner.plan_sql(sql).await?;
247247
is_select = matches!(&plan, Plan::Query { .. });
248-
InterpreterFactoryV2::get(ctx.clone(), &plan)
248+
InterpreterFactoryV2::get(ctx.clone(), &plan).await
249249
} else {
250250
let plan = match PlanParser::parse(ctx.clone(), sql).await {
251251
Ok(p) => p,

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,9 +347,9 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
347347
let mut has_result_set = true;
348348
let interpreter = if settings.get_enable_planner_v2()? != 0 {
349349
let mut planner = Planner::new(context.clone());
350-
planner.plan_sql(query).await.and_then(|v| {
350+
planner.plan_sql(query).await.and_then(|v| async move {
351351
has_result_set = has_result_set_by_plan(&v.0);
352-
InterpreterFactoryV2::get(context.clone(), &v.0)
352+
InterpreterFactoryV2::get(context.clone(), &v.0).await
353353
})
354354
} else {
355355
let (plan, _) = PlanParser::parse_with_hint(query, context.clone()).await;

src/query/service/tests/it/interpreters/access/management_mode_access.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,15 @@ async fn test_management_mode_access() -> Result<()> {
159159
for test in group.tests {
160160
let (plan, _, _) = planner.plan_sql(test.query).await?;
161161
if test.is_err {
162-
let res = InterpreterFactoryV2::get(ctx.clone(), &plan);
162+
let res = InterpreterFactoryV2::get(ctx.clone(), &plan).await;
163163
assert_eq!(
164164
test.is_err,
165165
res.is_err(),
166166
"in test case:{:?}",
167167
(group.name, test.name)
168168
);
169169
} else {
170-
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
170+
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
171171
interpreter.execute(ctx.clone()).await?;
172172
}
173173
}

src/query/service/tests/it/interpreters/async_insert_queue.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async fn test_async_insert_queue() -> Result<()> {
5151
{
5252
let query = "create table default.test(a int, b String) Engine = Memory;";
5353
let (plan, _, _) = planner.plan_sql(query).await?;
54-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
54+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
5555
let _ = executor.execute(ctx.clone()).await?;
5656
}
5757

@@ -170,7 +170,7 @@ async fn test_async_insert_queue_max_data_size() -> Result<()> {
170170
{
171171
let query = "create table default.test(a int, b String) Engine = Memory;";
172172
let (plan, _, _) = planner.plan_sql(query).await?;
173-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
173+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
174174
let _ = executor.execute(ctx.clone()).await?;
175175
}
176176

@@ -220,7 +220,7 @@ async fn test_async_insert_queue_busy_timeout() -> Result<()> {
220220
{
221221
let query = "create table default.test(a int, b String) Engine = Memory;";
222222
let (plan, _, _) = planner.plan_sql(query).await?;
223-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
223+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
224224
let _ = executor.execute(ctx.clone()).await?;
225225
}
226226

@@ -271,7 +271,7 @@ async fn test_async_insert_queue_stale_timeout() -> Result<()> {
271271
{
272272
let query = "create table default.test(a int, b String) Engine = Memory;";
273273
let (plan, _, _) = planner.plan_sql(query).await?;
274-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
274+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
275275
let _ = executor.execute(ctx.clone()).await?;
276276
}
277277

@@ -321,7 +321,7 @@ async fn test_async_insert_queue_wait_timeout() -> Result<()> {
321321
{
322322
let query = "create table default.test(a int, b String) Engine = Memory;";
323323
let (plan, _, _) = planner.plan_sql(query).await?;
324-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
324+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
325325
let _ = executor.execute(ctx.clone()).await?;
326326
}
327327

@@ -364,7 +364,7 @@ async fn test_async_insert_queue_no_wait() -> Result<()> {
364364
{
365365
let query = "create table default.test(a int, b String) Engine = Memory;";
366366
let (plan, _, _) = planner.plan_sql(query).await?;
367-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
367+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
368368
let _ = executor.execute(ctx.clone()).await?;
369369
}
370370

src/query/service/tests/it/interpreters/interpreter_call.rs

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async fn test_call_interpreter() -> Result<()> {
3333

3434
let query = "call system$test()";
3535
let (plan, _, _) = planner.plan_sql(query).await?;
36-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
36+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
3737
assert_eq!(executor.name(), "CallInterpreter");
3838
let res = executor.execute(ctx.clone()).await;
3939
assert_eq!(res.is_err(), true);
@@ -53,7 +53,7 @@ async fn test_call_fuse_snapshot_interpreter() -> Result<()> {
5353
{
5454
let query = "call system$fuse_snapshot()";
5555
let (plan, _, _) = planner.plan_sql(query).await?;
56-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
56+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
5757
assert_eq!(executor.name(), "CallInterpreter");
5858
let res = executor.execute(ctx.clone()).await;
5959
assert_eq!(res.is_err(), true);
@@ -65,7 +65,7 @@ async fn test_call_fuse_snapshot_interpreter() -> Result<()> {
6565
{
6666
let query = "call system$fuse_snapshot(default, test)";
6767
let (plan, _, _) = planner.plan_sql(query).await?;
68-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
68+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
6969
assert_eq!(executor.name(), "CallInterpreter");
7070
let res = executor.execute(ctx.clone()).await;
7171
assert_eq!(res.is_err(), true);
@@ -79,7 +79,7 @@ async fn test_call_fuse_snapshot_interpreter() -> Result<()> {
7979
{
8080
let query = "call system$fuse_snapshot(system, tables)";
8181
let (plan, _, _) = planner.plan_sql(query).await?;
82-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
82+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
8383
assert_eq!(executor.name(), "CallInterpreter");
8484
let res = executor.execute(ctx.clone()).await;
8585
assert_eq!(res.is_err(), true);
@@ -95,15 +95,15 @@ async fn test_call_fuse_snapshot_interpreter() -> Result<()> {
9595
";
9696

9797
let (plan, _, _) = planner.plan_sql(query).await?;
98-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
98+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
9999
let _ = executor.execute(ctx.clone()).await?;
100100
}
101101

102102
// FuseHistory
103103
{
104104
let query = "call system$fuse_snapshot(default, a)";
105105
let (plan, _, _) = planner.plan_sql(query).await?;
106-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
106+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
107107
let _ = executor.execute(ctx.clone()).await?;
108108
}
109109

@@ -119,7 +119,7 @@ async fn test_call_fuse_block_interpreter() -> Result<()> {
119119
{
120120
let query = "call system$fuse_block()";
121121
let (plan, _, _) = planner.plan_sql(query).await?;
122-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
122+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
123123
assert_eq!(executor.name(), "CallInterpreter");
124124
let res = executor.execute(ctx.clone()).await;
125125
assert_eq!(res.is_err(), true);
@@ -131,7 +131,7 @@ async fn test_call_fuse_block_interpreter() -> Result<()> {
131131
{
132132
let query = "call system$fuse_block(default, test)";
133133
let (plan, _, _) = planner.plan_sql(query).await?;
134-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
134+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
135135
assert_eq!(executor.name(), "CallInterpreter");
136136
let res = executor.execute(ctx.clone()).await;
137137
assert_eq!(res.is_err(), true);
@@ -145,7 +145,7 @@ async fn test_call_fuse_block_interpreter() -> Result<()> {
145145
{
146146
let query = "call system$fuse_block(system, tables)";
147147
let (plan, _, _) = planner.plan_sql(query).await?;
148-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
148+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
149149
assert_eq!(executor.name(), "CallInterpreter");
150150
let res = executor.execute(ctx.clone()).await;
151151
assert_eq!(res.is_err(), true);
@@ -161,15 +161,15 @@ async fn test_call_fuse_block_interpreter() -> Result<()> {
161161
";
162162

163163
let (plan, _, _) = planner.plan_sql(query).await?;
164-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
164+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
165165
let _ = executor.execute(ctx.clone()).await?;
166166
}
167167

168168
// fuse_block
169169
{
170170
let query = "call system$fuse_block(default, a)";
171171
let (plan, _, _) = planner.plan_sql(query).await?;
172-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
172+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
173173
let _ = executor.execute(ctx.clone()).await?;
174174
}
175175

@@ -185,7 +185,7 @@ async fn test_call_clustering_information_interpreter() -> Result<()> {
185185
{
186186
let query = "call system$clustering_information()";
187187
let (plan, _, _) = planner.plan_sql(query).await?;
188-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
188+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
189189
assert_eq!(executor.name(), "CallInterpreter");
190190
let res = executor.execute(ctx.clone()).await;
191191
assert_eq!(res.is_err(), true);
@@ -197,7 +197,7 @@ async fn test_call_clustering_information_interpreter() -> Result<()> {
197197
{
198198
let query = "call system$clustering_information(default, test)";
199199
let (plan, _, _) = planner.plan_sql(query).await?;
200-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
200+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
201201
assert_eq!(executor.name(), "CallInterpreter");
202202
let res = executor.execute(ctx.clone()).await;
203203
assert_eq!(res.is_err(), true);
@@ -211,7 +211,7 @@ async fn test_call_clustering_information_interpreter() -> Result<()> {
211211
{
212212
let query = "call system$clustering_information(system, tables)";
213213
let (plan, _, _) = planner.plan_sql(query).await?;
214-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
214+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
215215
assert_eq!(executor.name(), "CallInterpreter");
216216
let res = executor.execute(ctx.clone()).await;
217217
assert_eq!(res.is_err(), true);
@@ -227,15 +227,15 @@ async fn test_call_clustering_information_interpreter() -> Result<()> {
227227
";
228228

229229
let (plan, _, _) = planner.plan_sql(query).await?;
230-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
230+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
231231
let _ = executor.execute(ctx.clone()).await?;
232232
}
233233

234234
// Unclustered.
235235
{
236236
let query = "call system$clustering_information(default, a)";
237237
let (plan, _, _) = planner.plan_sql(query).await?;
238-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
238+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
239239
let res = executor.execute(ctx.clone()).await;
240240
assert_eq!(res.is_err(), true);
241241
let expect =
@@ -250,15 +250,15 @@ async fn test_call_clustering_information_interpreter() -> Result<()> {
250250
";
251251

252252
let (plan, _, _) = planner.plan_sql(query).await?;
253-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
253+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
254254
let _ = executor.execute(ctx.clone()).await?;
255255
}
256256

257257
// clustering_information
258258
{
259259
let query = "call system$clustering_information(default, b)";
260260
let (plan, _, _) = planner.plan_sql(query).await?;
261-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
261+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
262262
let _ = executor.execute(ctx.clone()).await?;
263263
}
264264

@@ -294,7 +294,7 @@ async fn test_call_tenant_quota_interpreter() -> Result<()> {
294294
{
295295
let query = "call admin$tenant_quota()";
296296
let (plan, _, _) = planner.plan_sql(query).await?;
297-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
297+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
298298
let stream = executor.execute(ctx.clone()).await?;
299299
let result = stream.try_collect::<Vec<_>>().await?;
300300
let expected = vec![
@@ -311,7 +311,7 @@ async fn test_call_tenant_quota_interpreter() -> Result<()> {
311311
{
312312
let query = "call admin$tenant_quota(tenant1)";
313313
let (plan, _, _) = planner.plan_sql(query).await?;
314-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
314+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
315315
let stream = executor.execute(ctx.clone()).await?;
316316
let result = stream.try_collect::<Vec<_>>().await?;
317317
let expected = vec![
@@ -328,7 +328,7 @@ async fn test_call_tenant_quota_interpreter() -> Result<()> {
328328
{
329329
let query = "call admin$tenant_quota(tenant1, 7, 5, 3, 3)";
330330
let (plan, _, _) = planner.plan_sql(query).await?;
331-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
331+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
332332
let stream = executor.execute(ctx.clone()).await?;
333333
let result = stream.try_collect::<Vec<_>>().await?;
334334
let expected = vec![
@@ -344,7 +344,7 @@ async fn test_call_tenant_quota_interpreter() -> Result<()> {
344344
{
345345
let query = "call admin$tenant_quota(tenant1, 8)";
346346
let (plan, _, _) = planner.plan_sql(query).await?;
347-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
347+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
348348
let stream = executor.execute(ctx.clone()).await?;
349349
let result = stream.try_collect::<Vec<_>>().await?;
350350
let expected = vec![
@@ -360,7 +360,7 @@ async fn test_call_tenant_quota_interpreter() -> Result<()> {
360360
{
361361
let query = "call admin$tenant_quota(tenant1)";
362362
let (plan, _, _) = planner.plan_sql(query).await?;
363-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
363+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
364364
let stream = executor.execute(ctx.clone()).await?;
365365
let result = stream.try_collect::<Vec<_>>().await?;
366366
let expected = vec![
@@ -377,7 +377,7 @@ async fn test_call_tenant_quota_interpreter() -> Result<()> {
377377
{
378378
let query = "call admin$tenant_quota()";
379379
let (plan, _, _) = planner.plan_sql(query).await?;
380-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
380+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
381381
let stream = executor.execute(ctx.clone()).await?;
382382
let result = stream.try_collect::<Vec<_>>().await?;
383383
let expected = vec![
@@ -402,7 +402,7 @@ async fn test_call_tenant_quote_without_management_mode() -> Result<()> {
402402
{
403403
let query = "call admin$tenant_quota(tenant1)";
404404
let (plan, _, _) = planner.plan_sql(query).await?;
405-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
405+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
406406
let res = executor.execute(ctx.clone()).await;
407407
assert_eq!(res.is_err(), true);
408408
let expect =

src/query/service/tests/it/interpreters/interpreter_cluster_key_alter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ async fn test_alter_table_cluster_key_interpreter() -> Result<()> {
3333
";
3434

3535
let (plan, _, _) = planner.plan_sql(query).await?;
36-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
36+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
3737
let _ = executor.execute(ctx.clone()).await?;
3838
}
3939

4040
// Add cluster key.
4141
{
4242
let query = "Alter TABLE a CLUSTER BY(a, b)";
4343
let (plan, _, _) = planner.plan_sql(query).await?;
44-
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
44+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan).await?;
4545
assert_eq!(executor.name(), "AlterTableClusterKeyInterpreter");
4646
let stream = executor.execute(ctx.clone()).await?;
4747
let result = stream.try_collect::<Vec<_>>().await?;

0 commit comments

Comments
 (0)