Skip to content

Commit f40b8c3

Browse files
authored
feat: Support USE CATALOG syntax and current_catalog() function (#16926)
* working * add ut * format * add http session catalog& add ut * fix tests
1 parent b58d1eb commit f40b8c3

File tree

22 files changed

+221
-1
lines changed

22 files changed

+221
-1
lines changed

src/query/ast/src/ast/statements/statement.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ pub enum Statement {
124124
ShowCreateCatalog(ShowCreateCatalogStmt),
125125
CreateCatalog(CreateCatalogStmt),
126126
DropCatalog(DropCatalogStmt),
127+
UseCatalog {
128+
catalog: Identifier,
129+
},
127130

128131
// Databases
129132
ShowDatabases(ShowDatabasesStmt),
@@ -410,6 +413,7 @@ impl Statement {
410413
| Statement::Update(..)
411414
| Statement::ShowCatalogs(..)
412415
| Statement::ShowCreateCatalog(..)
416+
| Statement::UseCatalog { .. }
413417
| Statement::ShowDatabases(..)
414418
| Statement::ShowDropDatabases(..)
415419
| Statement::ShowCreateDatabase(..)
@@ -718,6 +722,7 @@ impl Display for Statement {
718722
Statement::ShowCreateCatalog(stmt) => write!(f, "{stmt}")?,
719723
Statement::CreateCatalog(stmt) => write!(f, "{stmt}")?,
720724
Statement::DropCatalog(stmt) => write!(f, "{stmt}")?,
725+
Statement::UseCatalog { catalog } => write!(f, "USE CATALOG {catalog}")?,
721726
Statement::ShowDatabases(stmt) => write!(f, "{stmt}")?,
722727
Statement::ShowDropDatabases(stmt) => write!(f, "{stmt}")?,
723728
Statement::ShowCreateDatabase(stmt) => write!(f, "{stmt}")?,

src/query/ast/src/parser/statement.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,12 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
516516
})
517517
},
518518
);
519+
let use_catalog = map(
520+
rule! {
521+
USE ~ CATALOG ~ #ident
522+
},
523+
|(_, _, catalog)| Statement::UseCatalog { catalog },
524+
);
519525

520526
let show_databases = map(
521527
rule! {
@@ -2281,6 +2287,11 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
22812287
| #set_priority: "`SET PRIORITY (HIGH | MEDIUM | LOW) <object_id>`"
22822288
| #system_action: "`SYSTEM (ENABLE | DISABLE) EXCEPTION_BACKTRACE`"
22832289
),
2290+
// use
2291+
rule!(
2292+
#use_catalog: "`USE CATALOG <catalog>`"
2293+
| #use_database : "`USE <database>`"
2294+
),
22842295
// database
22852296
rule!(
22862297
#show_databases : "`SHOW [FULL] DATABASES [(FROM | IN) <catalog>] [<show_limit>]`"
@@ -2290,7 +2301,6 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
22902301
| #create_database : "`CREATE [OR REPLACE] DATABASE [IF NOT EXISTS] <database> [ENGINE = <engine>]`"
22912302
| #drop_database : "`DROP DATABASE [IF EXISTS] <database>`"
22922303
| #alter_database : "`ALTER DATABASE [IF EXISTS] <action>`"
2293-
| #use_database : "`USE <database>`"
22942304
),
22952305
// network policy / password policy
22962306
rule!(

src/query/ast/tests/it/parser.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ fn test_statement() {
137137
r#"drop table if exists a."b";"#,
138138
r#"use "a";"#,
139139
r#"create catalog ctl type=hive connection=(url='<hive-meta-store>' thrift_protocol='binary');"#,
140+
r#"select current_catalog();"#,
141+
r#"use catalog ctl;"#,
140142
r#"create database if not exists a;"#,
141143
r#"create database ctl.t engine = Default;"#,
142144
r#"create database t engine = Default;"#,

src/query/ast/tests/it/testdata/stmt.txt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2954,6 +2954,83 @@ CreateCatalog(
29542954
)
29552955

29562956

2957+
---------- Input ----------
2958+
select current_catalog();
2959+
---------- Output ---------
2960+
SELECT current_catalog()
2961+
---------- AST ------------
2962+
Query(
2963+
Query {
2964+
span: Some(
2965+
0..24,
2966+
),
2967+
with: None,
2968+
body: Select(
2969+
SelectStmt {
2970+
span: Some(
2971+
0..24,
2972+
),
2973+
hints: None,
2974+
distinct: false,
2975+
top_n: None,
2976+
select_list: [
2977+
AliasedExpr {
2978+
expr: FunctionCall {
2979+
span: Some(
2980+
7..24,
2981+
),
2982+
func: FunctionCall {
2983+
distinct: false,
2984+
name: Identifier {
2985+
span: Some(
2986+
7..22,
2987+
),
2988+
name: "current_catalog",
2989+
quote: None,
2990+
ident_type: None,
2991+
},
2992+
args: [],
2993+
params: [],
2994+
window: None,
2995+
lambda: None,
2996+
},
2997+
},
2998+
alias: None,
2999+
},
3000+
],
3001+
from: [],
3002+
selection: None,
3003+
group_by: None,
3004+
having: None,
3005+
window_list: None,
3006+
qualify: None,
3007+
},
3008+
),
3009+
order_by: [],
3010+
limit: [],
3011+
offset: None,
3012+
ignore_result: false,
3013+
},
3014+
)
3015+
3016+
3017+
---------- Input ----------
3018+
use catalog ctl;
3019+
---------- Output ---------
3020+
USE CATALOG ctl
3021+
---------- AST ------------
3022+
UseCatalog {
3023+
catalog: Identifier {
3024+
span: Some(
3025+
12..15,
3026+
),
3027+
name: "ctl",
3028+
quote: None,
3029+
ident_type: None,
3030+
},
3031+
}
3032+
3033+
29573034
---------- Input ----------
29583035
create database if not exists a;
29593036
---------- Output ---------

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,6 +1182,7 @@ impl AccessChecker for PrivilegeAccess {
11821182
Plan::ShowCreateCatalog(_)
11831183
| Plan::CreateCatalog(_)
11841184
| Plan::DropCatalog(_)
1185+
| Plan::UseCatalog(_)
11851186
| Plan::CreateFileFormat(_)
11861187
| Plan::DropFileFormat(_)
11871188
| Plan::ShowFileFormats(_)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_exception::ErrorCode;
18+
use databend_common_exception::Result;
19+
use databend_common_expression::DataSchema;
20+
use databend_common_sql::plans::UseCatalogPlan;
21+
22+
use crate::interpreters::Interpreter;
23+
use crate::pipelines::PipelineBuildResult;
24+
use crate::sessions::QueryAffect;
25+
use crate::sessions::QueryContext;
26+
27+
pub struct UseCatalogInterpreter {
28+
ctx: Arc<QueryContext>,
29+
plan: UseCatalogPlan,
30+
}
31+
32+
impl UseCatalogInterpreter {
33+
pub fn create(ctx: Arc<QueryContext>, plan: UseCatalogPlan) -> Self {
34+
UseCatalogInterpreter { ctx, plan }
35+
}
36+
}
37+
38+
#[async_trait::async_trait]
39+
impl Interpreter for UseCatalogInterpreter {
40+
fn name(&self) -> &str {
41+
"UseCatalogInterpreter"
42+
}
43+
44+
fn is_ddl(&self) -> bool {
45+
false
46+
}
47+
48+
#[async_backtrace::framed]
49+
async fn execute2(&self) -> Result<PipelineBuildResult> {
50+
if self.plan.catalog.trim().is_empty() {
51+
return Err(ErrorCode::UnknownCatalog("No catalog selected"));
52+
}
53+
self.ctx
54+
.set_current_catalog(self.plan.catalog.clone())
55+
.await?;
56+
self.ctx.set_affect(QueryAffect::UseCatalog {
57+
name: self.plan.catalog.clone(),
58+
});
59+
let _schema = Arc::new(DataSchema::empty());
60+
Ok(PipelineBuildResult::create())
61+
}
62+
}

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ impl InterpreterFactory {
174174
Plan::DropCatalog(plan) => {
175175
Ok(Arc::new(DropCatalogInterpreter::create(ctx, *plan.clone())))
176176
}
177+
Plan::UseCatalog(plan) => {
178+
Ok(Arc::new(UseCatalogInterpreter::create(ctx, *plan.clone())))
179+
}
177180

178181
// Databases
179182
Plan::ShowCreateDatabase(show_create_database) => Ok(Arc::new(

src/query/service/src/interpreters/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod interpreter;
1919
mod interpreter_catalog_create;
2020
mod interpreter_catalog_drop;
2121
mod interpreter_catalog_show_create;
22+
mod interpreter_catalog_use;
2223
mod interpreter_cluster_key_alter;
2324
mod interpreter_cluster_key_drop;
2425
mod interpreter_clustering_history;
@@ -153,6 +154,7 @@ pub use hook::HookOperator;
153154
pub use interpreter::interpreter_plan_sql;
154155
pub use interpreter::Interpreter;
155156
pub use interpreter::InterpreterPtr;
157+
pub use interpreter_catalog_use::UseCatalogInterpreter;
156158
pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter;
157159
pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
158160
pub use interpreter_clustering_history::InterpreterClusteringHistory;

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ pub struct Executor {
146146
// may store these new session state, and pass it to the next http query request.
147147
#[derive(Debug, Clone)]
148148
pub struct ExecutorSessionState {
149+
pub current_catalog: String,
149150
pub current_database: String,
150151
pub current_role: Option<String>,
151152
pub secondary_roles: Option<Vec<String>>,
@@ -158,6 +159,7 @@ pub struct ExecutorSessionState {
158159
impl ExecutorSessionState {
159160
pub fn new(session: Arc<Session>) -> Self {
160161
Self {
162+
current_catalog: session.get_current_catalog(),
161163
current_database: session.get_current_database(),
162164
current_role: session.get_current_role().map(|r| r.name),
163165
secondary_roles: session.get_secondary_roles(),

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,8 @@ where D: Deserializer<'de> {
262262

263263
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
264264
pub struct HttpSessionConf {
265+
#[serde(skip_serializing_if = "Option::is_none")]
266+
pub catalog: Option<String>,
265267
#[serde(skip_serializing_if = "Option::is_none")]
266268
pub database: Option<String>,
267269
#[serde(skip_serializing_if = "Option::is_none")]
@@ -412,10 +414,14 @@ impl HttpQuery {
412414

413415
// Read the session variables in the request, and set them to the current session.
414416
// the session variables includes:
417+
// - the current catalog
415418
// - the current database
416419
// - the current role
417420
// - the session-level settings, like max_threads, http_handler_result_timeout_secs, etc.
418421
if let Some(session_conf) = &request.session {
422+
if let Some(catalog) = &session_conf.catalog {
423+
session.set_current_catalog(catalog.clone());
424+
}
419425
if let Some(db) = &session_conf.database {
420426
session.set_current_database(db.clone());
421427
}
@@ -649,6 +655,7 @@ impl HttpQuery {
649655
.filter(|item| matches!(item.level, ScopeLevel::Session))
650656
.map(|item| (item.name.to_string(), item.user_value.as_string()))
651657
.collect::<BTreeMap<_, _>>();
658+
let catalog = session_state.current_catalog.clone();
652659
let database = session_state.current_database.clone();
653660
let role = session_state.current_role.clone();
654661
let secondary_roles = session_state.secondary_roles.clone();
@@ -721,6 +728,7 @@ impl HttpQuery {
721728
let need_keep_alive = need_sticky || has_temp_table;
722729

723730
Ok(HttpSessionConf {
731+
catalog: Some(catalog),
724732
database: Some(database),
725733
role,
726734
secondary_roles,

0 commit comments

Comments
 (0)