Skip to content

Commit 05abda9

Browse files
authored
Merge pull request #7773 from BohuTANG/dev-refine-mod
chore: move codes from mod.rs to x.rs make more module
2 parents 4943098 + 986a939 commit 05abda9

File tree

19 files changed

+1316
-1142
lines changed

19 files changed

+1316
-1142
lines changed

src/query/service/src/sessions/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ mod query_ctx_shared;
1919
mod session;
2020
mod session_ctx;
2121
mod session_info;
22-
#[allow(clippy::module_inception)]
2322
mod session_mgr;
2423
mod session_mgr_status;
2524
mod session_settings;
Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_ast::ast::ExplainKind;
18+
use common_ast::ast::Statement;
19+
use common_ast::parser::parse_sql;
20+
use common_ast::parser::tokenize_sql;
21+
use common_ast::Backtrace;
22+
use common_ast::Dialect;
23+
use common_ast::UDFValidator;
24+
use common_catalog::catalog::CatalogManager;
25+
use common_catalog::table_context::TableContext;
26+
use common_datavalues::DataTypeImpl;
27+
use common_exception::Result;
28+
use common_meta_types::UserDefinedFunction;
29+
use common_planner::plans::AlterUDFPlan;
30+
use common_planner::plans::CallPlan;
31+
use common_planner::plans::CreateRolePlan;
32+
use common_planner::plans::CreateUDFPlan;
33+
use common_planner::plans::DropRolePlan;
34+
use common_planner::plans::DropStagePlan;
35+
use common_planner::plans::DropUDFPlan;
36+
use common_planner::plans::DropUserPlan;
37+
use common_planner::plans::ShowGrantsPlan;
38+
use common_planner::plans::UseDatabasePlan;
39+
use common_planner::MetadataRef;
40+
41+
use crate::sql::plans::Plan;
42+
use crate::sql::plans::RewriteKind;
43+
use crate::sql::BindContext;
44+
use crate::sql::ColumnBinding;
45+
use crate::sql::NameResolutionContext;
46+
use crate::sql::Visibility;
47+
48+
/// Binder is responsible to transform AST of a query into a canonical logical SExpr.
49+
///
50+
/// During this phase, it will:
51+
/// - Resolve columns and tables with Catalog
52+
/// - Check semantic of query
53+
/// - Validate expressions
54+
/// - Build `Metadata`
55+
pub struct Binder {
56+
pub ctx: Arc<dyn TableContext>,
57+
pub catalogs: Arc<CatalogManager>,
58+
pub name_resolution_ctx: NameResolutionContext,
59+
pub metadata: MetadataRef,
60+
}
61+
62+
impl<'a> Binder {
63+
pub fn new(
64+
ctx: Arc<dyn TableContext>,
65+
catalogs: Arc<CatalogManager>,
66+
name_resolution_ctx: NameResolutionContext,
67+
metadata: MetadataRef,
68+
) -> Self {
69+
Binder {
70+
ctx,
71+
catalogs,
72+
name_resolution_ctx,
73+
metadata,
74+
}
75+
}
76+
77+
pub async fn bind(mut self, stmt: &Statement<'a>) -> Result<Plan> {
78+
let init_bind_context = BindContext::new();
79+
self.bind_statement(&init_bind_context, stmt).await
80+
}
81+
82+
#[async_recursion::async_recursion]
83+
pub(crate) async fn bind_statement(
84+
&mut self,
85+
bind_context: &BindContext,
86+
stmt: &Statement<'a>,
87+
) -> Result<Plan> {
88+
let plan = match stmt {
89+
Statement::Query(query) => {
90+
let (s_expr, bind_context) = self.bind_query(bind_context, query).await?;
91+
Plan::Query {
92+
s_expr: Box::new(s_expr),
93+
metadata: self.metadata.clone(),
94+
bind_context: Box::new(bind_context),
95+
rewrite_kind: None,
96+
}
97+
}
98+
99+
Statement::Explain { query, kind } => {
100+
match kind {
101+
ExplainKind::Ast(formatted_stmt) => Plan::ExplainAst { formatted_string: formatted_stmt.clone() },
102+
ExplainKind::Syntax(formatted_sql) => Plan::ExplainSyntax { formatted_sql: formatted_sql.clone() },
103+
_ => Plan::Explain { kind: kind.clone(), plan: Box::new(self.bind_statement(bind_context, query).await?) },
104+
}
105+
}
106+
107+
Statement::ShowFunctions { limit } => {
108+
self.bind_show_functions(bind_context, limit).await?
109+
}
110+
111+
Statement::Copy(stmt) => self.bind_copy(bind_context, stmt).await?,
112+
113+
Statement::ShowMetrics => {
114+
self.bind_rewrite_to_query(
115+
bind_context,
116+
"SELECT metric, kind, labels, value FROM system.metrics",
117+
RewriteKind::ShowMetrics
118+
)
119+
.await?
120+
}
121+
Statement::ShowProcessList => {
122+
self.bind_rewrite_to_query(bind_context, "SELECT * FROM system.processes", RewriteKind::ShowProcessList)
123+
.await?
124+
}
125+
Statement::ShowEngines => {
126+
self.bind_rewrite_to_query(bind_context, "SELECT \"Engine\", \"Comment\" FROM system.engines ORDER BY \"Engine\" ASC", RewriteKind::ShowEngines)
127+
.await?
128+
},
129+
Statement::ShowSettings { like } => self.bind_show_settings(bind_context, like).await?,
130+
131+
// Databases
132+
Statement::ShowDatabases(stmt) => self.bind_show_databases(bind_context, stmt).await?,
133+
Statement::ShowCreateDatabase(stmt) => self.bind_show_create_database(stmt).await?,
134+
Statement::CreateDatabase(stmt) => self.bind_create_database(stmt).await?,
135+
Statement::DropDatabase(stmt) => self.bind_drop_database(stmt).await?,
136+
Statement::UndropDatabase(stmt) => self.bind_undrop_database(stmt).await?,
137+
Statement::AlterDatabase(stmt) => self.bind_alter_database(stmt).await?,
138+
Statement::UseDatabase { database } => {
139+
Plan::UseDatabase(Box::new(UseDatabasePlan {
140+
database: database.name.clone(),
141+
}))
142+
}
143+
// Tables
144+
Statement::ShowTables(stmt) => self.bind_show_tables(bind_context, stmt).await?,
145+
Statement::ShowCreateTable(stmt) => self.bind_show_create_table(stmt).await?,
146+
Statement::DescribeTable(stmt) => self.bind_describe_table(stmt).await?,
147+
Statement::ShowTablesStatus(stmt) => {
148+
self.bind_show_tables_status(bind_context, stmt).await?
149+
}
150+
Statement::CreateTable(stmt) => self.bind_create_table(stmt).await?,
151+
Statement::DropTable(stmt) => self.bind_drop_table(stmt).await?,
152+
Statement::UndropTable(stmt) => self.bind_undrop_table(stmt).await?,
153+
Statement::AlterTable(stmt) => self.bind_alter_table(bind_context, stmt).await?,
154+
Statement::RenameTable(stmt) => self.bind_rename_table(stmt).await?,
155+
Statement::TruncateTable(stmt) => self.bind_truncate_table(stmt).await?,
156+
Statement::OptimizeTable(stmt) => self.bind_optimize_table(stmt).await?,
157+
Statement::ExistsTable(stmt) => self.bind_exists_table(stmt).await?,
158+
159+
// Views
160+
Statement::CreateView(stmt) => self.bind_create_view(stmt).await?,
161+
Statement::AlterView(stmt) => self.bind_alter_view(stmt).await?,
162+
Statement::DropView(stmt) => self.bind_drop_view(stmt).await?,
163+
164+
// Users
165+
Statement::CreateUser(stmt) => self.bind_create_user(stmt).await?,
166+
Statement::DropUser { if_exists, user } => Plan::DropUser(Box::new(DropUserPlan {
167+
if_exists: *if_exists,
168+
user: user.clone(),
169+
})),
170+
Statement::ShowUsers => self.bind_rewrite_to_query(bind_context, "SELECT name, hostname, auth_type, auth_string FROM system.users ORDER BY name", RewriteKind::ShowUsers).await?,
171+
Statement::AlterUser(stmt) => self.bind_alter_user(stmt).await?,
172+
173+
// Roles
174+
Statement::ShowRoles => self.bind_rewrite_to_query(bind_context, "SELECT name, inherited_roles FROM system.roles ORDER BY name", RewriteKind::ShowRoles).await?,
175+
Statement::CreateRole {
176+
if_not_exists,
177+
role_name,
178+
} => Plan::CreateRole(Box::new(CreateRolePlan {
179+
if_not_exists: *if_not_exists,
180+
role_name: role_name.to_string(),
181+
})),
182+
Statement::DropRole {
183+
if_exists,
184+
role_name,
185+
} => Plan::DropRole(Box::new(DropRolePlan {
186+
if_exists: *if_exists,
187+
role_name: role_name.to_string(),
188+
})),
189+
190+
// Stages
191+
Statement::ShowStages => self.bind_rewrite_to_query(bind_context, "SELECT name, stage_type, number_of_files, creator, comment FROM system.stages ORDER BY name", RewriteKind::ShowStages).await?,
192+
Statement::ListStage { location, pattern } => {
193+
self.bind_list_stage(location, pattern).await?
194+
}
195+
Statement::DescribeStage { stage_name } => self.bind_rewrite_to_query(bind_context, format!("SELECT * FROM system.stages WHERE name = '{stage_name}'").as_str(), RewriteKind::DescribeStage).await?,
196+
Statement::CreateStage(stmt) => self.bind_create_stage(stmt).await?,
197+
Statement::DropStage {
198+
stage_name,
199+
if_exists,
200+
} => Plan::DropStage(Box::new(DropStagePlan {
201+
if_exists: *if_exists,
202+
name: stage_name.clone(),
203+
})),
204+
Statement::RemoveStage { location, pattern } => {
205+
self.bind_remove_stage(location, pattern).await?
206+
}
207+
Statement::Insert(stmt) => self.bind_insert(bind_context, stmt).await?,
208+
Statement::Delete {
209+
table_reference,
210+
selection,
211+
} => {
212+
self.bind_delete(bind_context, table_reference, selection)
213+
.await?
214+
}
215+
216+
// Permissions
217+
Statement::Grant(stmt) => self.bind_grant(stmt).await?,
218+
Statement::ShowGrants { principal } => Plan::ShowGrants(Box::new(ShowGrantsPlan {
219+
principal: principal.clone(),
220+
})),
221+
Statement::Revoke(stmt) => self.bind_revoke(stmt).await?,
222+
223+
// UDFs
224+
Statement::CreateUDF {
225+
if_not_exists,
226+
udf_name,
227+
parameters,
228+
definition,
229+
description,
230+
} => {
231+
let mut validator = UDFValidator {
232+
name : udf_name.to_string(),
233+
parameters: parameters.iter().map(|v| v.to_string()).collect(),
234+
..Default::default()
235+
};
236+
validator.verify_definition_expr(definition)?;
237+
let udf = UserDefinedFunction {
238+
name: validator.name,
239+
parameters: validator.parameters,
240+
definition: definition.to_string(),
241+
description: description.clone().unwrap_or_default(),
242+
};
243+
244+
Plan::CreateUDF(Box::new(CreateUDFPlan {
245+
if_not_exists: *if_not_exists,
246+
udf
247+
}))
248+
},
249+
Statement::AlterUDF {
250+
udf_name,
251+
parameters,
252+
definition,
253+
description,
254+
} => {
255+
let mut validator = UDFValidator {
256+
name : udf_name.to_string(),
257+
parameters: parameters.iter().map(|v| v.to_string()).collect(),
258+
..Default::default()
259+
};
260+
validator.verify_definition_expr(definition)?;
261+
let udf = UserDefinedFunction {
262+
name: validator.name,
263+
parameters: validator.parameters,
264+
definition: definition.to_string(),
265+
description: description.clone().unwrap_or_default(),
266+
};
267+
268+
Plan::AlterUDF(Box::new(AlterUDFPlan {
269+
udf,
270+
}))
271+
}
272+
Statement::DropUDF {
273+
if_exists,
274+
udf_name,
275+
} => Plan::DropUDF(Box::new(DropUDFPlan {
276+
if_exists: *if_exists,
277+
name: udf_name.to_string(),
278+
})),
279+
Statement::Call(stmt) => Plan::Call(Box::new(CallPlan {
280+
name: stmt.name.clone(),
281+
args: stmt.args.clone(),
282+
})),
283+
284+
Statement::Presign(stmt) => self.bind_presign(bind_context, stmt).await?,
285+
286+
Statement::SetVariable {
287+
is_global,
288+
variable,
289+
value,
290+
} => {
291+
self.bind_set_variable(bind_context, *is_global, variable, value)
292+
.await?
293+
}
294+
Statement::KillStmt { kill_target, object_id } => {
295+
self.bind_kill_stmt(bind_context, kill_target, object_id.as_str())
296+
.await?
297+
}
298+
299+
// share statements
300+
Statement::CreateShare(stmt) => {
301+
self.bind_create_share(stmt).await?
302+
}
303+
Statement::DropShare(stmt) => {
304+
self.bind_drop_share(stmt).await?
305+
}
306+
Statement::GrantShareObject(stmt) => {
307+
self.bind_grant_share_object(stmt).await?
308+
}
309+
Statement::RevokeShareObject(stmt) => {
310+
self.bind_revoke_share_object(stmt).await?
311+
}
312+
Statement::AlterShareTenants(stmt) => {
313+
self.bind_alter_share_accounts(stmt).await?
314+
}
315+
Statement::DescShare(stmt) => {
316+
self.bind_desc_share(stmt).await?
317+
}
318+
Statement::ShowShares(stmt) => {
319+
self.bind_show_shares(stmt).await?
320+
}
321+
Statement::ShowObjectGrantPrivileges(stmt) => {
322+
self.bind_show_object_grant_privileges(stmt).await?
323+
}
324+
Statement::ShowGrantsOfShare(stmt) => {
325+
self.bind_show_grants_of_share(stmt).await?
326+
}
327+
};
328+
Ok(plan)
329+
}
330+
331+
pub(crate) async fn bind_rewrite_to_query(
332+
&mut self,
333+
bind_context: &BindContext,
334+
query: &str,
335+
rewrite_kind_r: RewriteKind,
336+
) -> Result<Plan> {
337+
let tokens = tokenize_sql(query)?;
338+
let backtrace = Backtrace::new();
339+
let (stmt, _) = parse_sql(&tokens, Dialect::PostgreSQL, &backtrace)?;
340+
let mut plan = self.bind_statement(bind_context, &stmt).await?;
341+
342+
if let Plan::Query { rewrite_kind, .. } = &mut plan {
343+
*rewrite_kind = Some(rewrite_kind_r)
344+
}
345+
Ok(plan)
346+
}
347+
348+
/// Create a new ColumnBinding with assigned index
349+
pub(crate) fn create_column_binding(
350+
&mut self,
351+
database_name: Option<String>,
352+
table_name: Option<String>,
353+
column_name: String,
354+
data_type: DataTypeImpl,
355+
) -> ColumnBinding {
356+
let index =
357+
self.metadata
358+
.write()
359+
.add_column(column_name.clone(), data_type.clone(), None, None);
360+
ColumnBinding {
361+
database_name,
362+
table_name,
363+
column_name,
364+
index,
365+
data_type: Box::new(data_type),
366+
visibility: Visibility::Visible,
367+
}
368+
}
369+
}

0 commit comments

Comments
 (0)