Skip to content

Commit 67761e6

Browse files
authored
Merge branch 'main' into fix-cast-deterministic
2 parents ad9f4bd + 596e2d6 commit 67761e6

File tree

8 files changed

+95
-41
lines changed

8 files changed

+95
-41
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ fmt:
1818

1919
lint:
2020
cargo fmt --all
21+
cargo clippy --workspace --all-targets -- -D warnings
22+
2123
# Cargo.toml file formatter(make setup to install)
2224
taplo fmt
2325
# Python file formatter(make setup to install)
2426
yapf -ri tests/
2527
# Bash file formatter(make setup to install)
2628
shfmt -l -w scripts/*
27-
cargo clippy --workspace --all-targets -- -D warnings
2829

2930
lint-yaml:
3031
yamllint -f auto .
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use std::sync::Arc;
17+
18+
use common_exception::Result;
19+
use common_legacy_planners::PlanNode;
20+
21+
use crate::interpreters::ManagementModeAccess;
22+
use crate::sessions::QueryContext;
23+
use crate::sql::plans::Plan;
24+
25+
pub trait AccessChecker: Sync + Send {
26+
// Check the access permission for the old plan.
27+
// TODO(bohu): Remove after new plan done.
28+
fn check(&self, plan: &PlanNode) -> Result<()>;
29+
30+
// Check the access permission for the old plan.
31+
fn check_new(&self, _plan: &Plan) -> Result<()>;
32+
}
33+
34+
pub struct Accessor {
35+
accessors: HashMap<String, Box<dyn AccessChecker>>,
36+
}
37+
38+
impl Accessor {
39+
pub fn create(ctx: Arc<QueryContext>) -> Self {
40+
let mut accessors: HashMap<String, Box<dyn AccessChecker>> = Default::default();
41+
accessors.insert("management".to_string(), ManagementModeAccess::create(ctx));
42+
Accessor { accessors }
43+
}
44+
45+
pub fn check(&self, plan: &PlanNode) -> Result<()> {
46+
for accessor in self.accessors.values() {
47+
accessor.check(plan)?;
48+
}
49+
Ok(())
50+
}
51+
52+
pub fn check_new(&self, plan: &Plan) -> Result<()> {
53+
for accessor in self.accessors.values() {
54+
accessor.check_new(plan)?;
55+
}
56+
Ok(())
57+
}
58+
}

src/query/service/src/interpreters/access/management_mode_access.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use common_exception::ErrorCode;
1818
use common_exception::Result;
1919
use common_legacy_planners::PlanNode;
2020

21+
use crate::interpreters::access::AccessChecker;
2122
use crate::sessions::QueryContext;
2223
use crate::sessions::TableContext;
2324
use crate::sql::plans::Plan;
@@ -27,12 +28,14 @@ pub struct ManagementModeAccess {
2728
}
2829

2930
impl ManagementModeAccess {
30-
pub fn create(ctx: Arc<QueryContext>) -> Self {
31-
ManagementModeAccess { ctx }
31+
pub fn create(ctx: Arc<QueryContext>) -> Box<dyn AccessChecker> {
32+
Box::new(ManagementModeAccess { ctx })
3233
}
34+
}
3335

36+
impl AccessChecker for ManagementModeAccess {
3437
// Check what we can do if in management mode.
35-
pub fn check(&self, plan: &PlanNode) -> Result<()> {
38+
fn check(&self, plan: &PlanNode) -> Result<()> {
3639
// Allows for management-mode.
3740
if self.ctx.get_config().query.management_mode {
3841
return match plan {
@@ -47,7 +50,7 @@ impl ManagementModeAccess {
4750
}
4851

4952
// Check what we can do if in management mode.
50-
pub fn check_new(&self, plan: &Plan) -> Result<()> {
53+
fn check_new(&self, plan: &Plan) -> Result<()> {
5154
// Allows for management-mode.
5255
if self.ctx.get_config().query.management_mode {
5356
let ok = match plan {

src/query/service/src/interpreters/access/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod accessor;
1516
mod management_mode_access;
1617

18+
pub use accessor::AccessChecker;
19+
pub use accessor::Accessor;
1720
pub use management_mode_access::ManagementModeAccess;

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use common_exception::ErrorCode;
1818
use common_exception::Result;
1919
use common_legacy_planners::PlanNode;
2020

21+
use crate::interpreters::access::Accessor;
2122
use crate::interpreters::DeleteInterpreter;
2223
use crate::interpreters::EmptyInterpreter;
2324
use crate::interpreters::ExplainInterpreter;
@@ -35,10 +36,14 @@ pub struct InterpreterFactory;
3536
/// Such as: SelectPlan -> SelectInterpreter, ExplainPlan -> ExplainInterpreter, ...
3637
impl InterpreterFactory {
3738
pub fn get(ctx: Arc<QueryContext>, plan: PlanNode) -> Result<Arc<dyn Interpreter>> {
39+
// Check the access permission.
40+
let access_checker = Accessor::create(ctx.clone());
41+
access_checker.check(&plan)?;
42+
3843
let inner = Self::create_interpreter(ctx.clone(), &plan)?;
3944
let query_kind = plan.name().to_string();
4045
Ok(Arc::new(InterceptorInterpreter::create(
41-
ctx, inner, plan, None, query_kind,
46+
ctx, inner, query_kind,
4247
)))
4348
}
4449

src/query/service/src/interpreters/interpreter_factory_interceptor.rs

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@ use std::time::SystemTime;
1717

1818
use common_datavalues::DataSchemaRef;
1919
use common_exception::Result;
20-
use common_legacy_planners::PlanNode;
2120
use common_streams::ErrorStream;
2221
use common_streams::ProgressStream;
2322
use common_streams::SendableDataBlockStream;
2423
use parking_lot::Mutex;
2524

26-
use crate::interpreters::access::ManagementModeAccess;
2725
use crate::interpreters::Interpreter;
2826
use crate::interpreters::InterpreterPtr;
2927
use crate::interpreters::InterpreterQueryLog;
@@ -32,34 +30,21 @@ use crate::pipelines::SourcePipeBuilder;
3230
use crate::sessions::QueryContext;
3331
use crate::sessions::SessionManager;
3432
use crate::sessions::TableContext;
35-
use crate::sql::plans::Plan;
3633

3734
pub struct InterceptorInterpreter {
3835
ctx: Arc<QueryContext>,
39-
plan: PlanNode,
40-
new_plan: Option<Plan>,
4136
inner: InterpreterPtr,
4237
query_log: InterpreterQueryLog,
4338
source_pipe_builder: Mutex<Option<SourcePipeBuilder>>,
44-
management_mode_access: ManagementModeAccess,
4539
}
4640

4741
impl InterceptorInterpreter {
48-
pub fn create(
49-
ctx: Arc<QueryContext>,
50-
inner: InterpreterPtr,
51-
plan: PlanNode,
52-
new_plan: Option<Plan>,
53-
query_kind: String,
54-
) -> Self {
42+
pub fn create(ctx: Arc<QueryContext>, inner: InterpreterPtr, query_kind: String) -> Self {
5543
InterceptorInterpreter {
5644
ctx: ctx.clone(),
57-
plan,
58-
new_plan,
5945
inner,
60-
query_log: InterpreterQueryLog::create(ctx.clone(), query_kind),
46+
query_log: InterpreterQueryLog::create(ctx, query_kind),
6147
source_pipe_builder: Mutex::new(None),
62-
management_mode_access: ManagementModeAccess::create(ctx),
6348
}
6449
}
6550
}
@@ -75,12 +60,6 @@ impl Interpreter for InterceptorInterpreter {
7560
}
7661

7762
async fn execute(&self, ctx: Arc<QueryContext>) -> Result<SendableDataBlockStream> {
78-
// Management mode access check.
79-
match &self.new_plan {
80-
Some(p) => self.management_mode_access.check_new(p)?,
81-
_ => self.management_mode_access.check(&self.plan)?,
82-
}
83-
8463
let _ = self
8564
.inner
8665
.set_source_pipe_builder((*self.source_pipe_builder.lock()).clone());

src/query/service/src/interpreters/interpreter_factory_v2.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
use std::sync::Arc;
1616

1717
use common_exception::Result;
18-
use common_legacy_planners::EmptyPlan;
19-
use common_legacy_planners::PlanNode;
2018

2119
use super::interpreter_share_desc::DescShareInterpreter;
2220
use super::interpreter_user_stage_drop::DropUserStageInterpreter;
2321
use super::*;
22+
use crate::interpreters::access::Accessor;
2423
use crate::interpreters::interpreter_copy_v2::CopyInterpreterV2;
2524
use crate::interpreters::interpreter_presign::PresignInterpreter;
2625
use crate::interpreters::interpreter_table_create_v2::CreateTableInterpreterV2;
@@ -44,13 +43,15 @@ impl InterpreterFactoryV2 {
4443
}
4544

4645
pub fn get(ctx: Arc<QueryContext>, plan: &Plan) -> Result<InterpreterPtr> {
46+
// Check the access permission.
47+
let access_checker = Accessor::create(ctx.clone());
48+
access_checker.check_new(plan)?;
49+
4750
let inner = InterpreterFactoryV2::create_interpreter(ctx.clone(), plan)?;
4851

4952
Ok(Arc::new(InterceptorInterpreter::create(
5053
ctx,
5154
inner,
52-
PlanNode::Empty(EmptyPlan::create()),
53-
Some(plan.clone()),
5455
plan.to_string(),
5556
)))
5657
}

src/query/service/tests/it/interpreters/access/management_mode_access.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,18 @@ async fn test_management_mode_access() -> Result<()> {
158158
for group in groups {
159159
for test in group.tests {
160160
let (plan, _, _) = planner.plan_sql(test.query).await?;
161-
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
162-
let res = interpreter.execute(ctx.clone()).await;
163-
assert_eq!(
164-
test.is_err,
165-
res.is_err(),
166-
"in test case:{:?}",
167-
(group.name, test.name)
168-
);
161+
if test.is_err {
162+
let res = InterpreterFactoryV2::get(ctx.clone(), &plan);
163+
assert_eq!(
164+
test.is_err,
165+
res.is_err(),
166+
"in test case:{:?}",
167+
(group.name, test.name)
168+
);
169+
} else {
170+
let interpreter = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
171+
interpreter.execute(ctx.clone()).await?;
172+
}
169173
}
170174
}
171175

0 commit comments

Comments
 (0)