Skip to content

Commit a29cd66

Browse files
committed
chore: move codes from mod.rs to make more module
1 parent c90718e commit a29cd66

File tree

2 files changed

+136
-119
lines changed

2 files changed

+136
-119
lines changed

src/query/service/src/sql/planner/mod.rs

Lines changed: 7 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -12,133 +12,21 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
17-
use common_ast::parser::parse_sql;
18-
use common_ast::parser::token::Token;
19-
use common_ast::parser::token::TokenKind;
20-
use common_ast::parser::token::Tokenizer;
21-
use common_ast::Backtrace;
22-
use common_exception::Result;
23-
use parking_lot::RwLock;
24-
pub use plans::ScalarExpr;
25-
26-
use crate::sql::optimizer::optimize;
27-
pub use crate::sql::planner::binder::BindContext;
28-
29-
pub(crate) mod binder;
3015
mod format;
3116
mod metadata;
17+
mod plan;
18+
mod semantic;
19+
20+
pub(crate) mod binder;
3221
pub mod optimizer;
3322
pub mod plans;
34-
mod semantic;
3523

24+
pub use binder::BindContext;
3625
pub use binder::Binder;
3726
pub use binder::ColumnBinding;
3827
pub use binder::Visibility;
39-
use common_catalog::catalog::CatalogManager;
40-
use common_planner::Metadata;
41-
use common_planner::MetadataRef;
28+
pub use plan::Planner;
29+
pub use plans::ScalarExpr;
4230
pub use semantic::normalize_identifier;
4331
pub use semantic::IdentifierNormalizer;
4432
pub use semantic::NameResolutionContext;
45-
46-
use self::plans::Plan;
47-
use super::optimizer::OptimizerConfig;
48-
use super::optimizer::OptimizerContext;
49-
use crate::catalogs::CatalogManagerHelper;
50-
use crate::sessions::TableContext;
51-
52-
const PROBE_INSERT_INITIAL_TOKENS: usize = 128;
53-
const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8;
54-
55-
pub struct Planner {
56-
ctx: Arc<dyn TableContext>,
57-
}
58-
59-
impl Planner {
60-
pub fn new(ctx: Arc<dyn TableContext>) -> Self {
61-
Planner { ctx }
62-
}
63-
64-
pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, MetadataRef, Option<String>)> {
65-
let settings = self.ctx.get_settings();
66-
let sql_dialect = settings.get_sql_dialect()?;
67-
68-
// Step 1: Tokenize the SQL.
69-
let mut tokenizer = Tokenizer::new(sql).peekable();
70-
71-
// Only tokenize the beginning tokens for `INSERT INTO` statement because the tokens of values is unused.
72-
//
73-
// Stop the tokenizer on unrecognized token because some values inputs (e.g. CSV) may not be valid for the tokenizer.
74-
// See also: https://github.com/datafuselabs/databend/issues/6669
75-
let is_insert_stmt = tokenizer
76-
.peek()
77-
.and_then(|token| Some(token.as_ref().ok()?.kind))
78-
== Some(TokenKind::INSERT);
79-
let mut tokens: Vec<Token> = if is_insert_stmt {
80-
(&mut tokenizer)
81-
.take(PROBE_INSERT_INITIAL_TOKENS)
82-
.take_while(|token| token.is_ok())
83-
// Make sure the tokens stream is always ended with EOI.
84-
.chain(std::iter::once(Ok(Token::new_eoi(sql))))
85-
.collect::<Result<_>>()
86-
.unwrap()
87-
} else {
88-
(&mut tokenizer).collect::<Result<_>>()?
89-
};
90-
91-
loop {
92-
let res = async {
93-
// Step 2: Parse the SQL.
94-
let backtrace = Backtrace::new();
95-
let (stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?;
96-
97-
// Step 3: Bind AST with catalog, and generate a pure logical SExpr
98-
let metadata = Arc::new(RwLock::new(Metadata::default()));
99-
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
100-
let binder = Binder::new(
101-
self.ctx.clone(),
102-
CatalogManager::instance(),
103-
name_resolution_ctx,
104-
metadata.clone(),
105-
);
106-
let plan = binder.bind(&stmt).await?;
107-
108-
// Step 4: Optimize the SExpr with optimizers, and generate optimized physical SExpr
109-
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
110-
enable_distributed_optimization: !self.ctx.get_cluster().is_empty(),
111-
}));
112-
let optimized_plan = optimize(self.ctx.clone(), opt_ctx, plan)?;
113-
114-
Ok((optimized_plan, metadata.clone(), format))
115-
}
116-
.await;
117-
118-
if res.is_err() && matches!(tokenizer.peek(), Some(Ok(_))) {
119-
// Remove the previous EOI.
120-
tokens.pop();
121-
// Tokenize more and try again.
122-
if tokens.len() < PROBE_INSERT_MAX_TOKENS {
123-
let iter = (&mut tokenizer)
124-
.take(tokens.len() * 2)
125-
.take_while(|token| token.is_ok())
126-
.chain(std::iter::once(Ok(Token::new_eoi(sql))))
127-
.map(|token| token.unwrap())
128-
// Make sure the tokens stream is always ended with EOI.
129-
.chain(std::iter::once(Token::new_eoi(sql)));
130-
tokens.extend(iter);
131-
} else {
132-
let iter = (&mut tokenizer)
133-
.take_while(|token| token.is_ok())
134-
.map(|token| token.unwrap())
135-
// Make sure the tokens stream is always ended with EOI.
136-
.chain(std::iter::once(Token::new_eoi(sql)));
137-
tokens.extend(iter);
138-
};
139-
} else {
140-
return res;
141-
}
142-
}
143-
}
144-
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_ast::parser::parse_sql;
18+
use common_ast::parser::token::Token;
19+
use common_ast::parser::token::TokenKind;
20+
use common_ast::parser::token::Tokenizer;
21+
use common_ast::Backtrace;
22+
use common_catalog::catalog::CatalogManager;
23+
use common_exception::Result;
24+
use common_planner::Metadata;
25+
use common_planner::MetadataRef;
26+
use parking_lot::RwLock;
27+
28+
use crate::catalogs::CatalogManagerHelper;
29+
use crate::sessions::TableContext;
30+
use crate::sql::optimizer::optimize;
31+
use crate::sql::optimizer::OptimizerConfig;
32+
use crate::sql::optimizer::OptimizerContext;
33+
use crate::sql::plans::Plan;
34+
use crate::sql::Binder;
35+
use crate::sql::NameResolutionContext;
36+
37+
const PROBE_INSERT_INITIAL_TOKENS: usize = 128;
38+
const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8;
39+
40+
pub struct Planner {
41+
ctx: Arc<dyn TableContext>,
42+
}
43+
44+
impl Planner {
45+
pub fn new(ctx: Arc<dyn TableContext>) -> Self {
46+
Planner { ctx }
47+
}
48+
49+
pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, MetadataRef, Option<String>)> {
50+
let settings = self.ctx.get_settings();
51+
let sql_dialect = settings.get_sql_dialect()?;
52+
53+
// Step 1: Tokenize the SQL.
54+
let mut tokenizer = Tokenizer::new(sql).peekable();
55+
56+
// Only tokenize the beginning tokens for `INSERT INTO` statement because the tokens of values is unused.
57+
//
58+
// Stop the tokenizer on unrecognized token because some values inputs (e.g. CSV) may not be valid for the tokenizer.
59+
// See also: https://github.com/datafuselabs/databend/issues/6669
60+
let is_insert_stmt = tokenizer
61+
.peek()
62+
.and_then(|token| Some(token.as_ref().ok()?.kind))
63+
== Some(TokenKind::INSERT);
64+
let mut tokens: Vec<Token> = if is_insert_stmt {
65+
(&mut tokenizer)
66+
.take(PROBE_INSERT_INITIAL_TOKENS)
67+
.take_while(|token| token.is_ok())
68+
// Make sure the tokens stream is always ended with EOI.
69+
.chain(std::iter::once(Ok(Token::new_eoi(sql))))
70+
.collect::<Result<_>>()
71+
.unwrap()
72+
} else {
73+
(&mut tokenizer).collect::<Result<_>>()?
74+
};
75+
76+
loop {
77+
let res = async {
78+
// Step 2: Parse the SQL.
79+
let backtrace = Backtrace::new();
80+
let (stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?;
81+
82+
// Step 3: Bind AST with catalog, and generate a pure logical SExpr
83+
let metadata = Arc::new(RwLock::new(Metadata::default()));
84+
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
85+
let binder = Binder::new(
86+
self.ctx.clone(),
87+
CatalogManager::instance(),
88+
name_resolution_ctx,
89+
metadata.clone(),
90+
);
91+
let plan = binder.bind(&stmt).await?;
92+
93+
// Step 4: Optimize the SExpr with optimizers, and generate optimized physical SExpr
94+
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
95+
enable_distributed_optimization: !self.ctx.get_cluster().is_empty(),
96+
}));
97+
let optimized_plan = optimize(self.ctx.clone(), opt_ctx, plan)?;
98+
99+
Ok((optimized_plan, metadata.clone(), format))
100+
}
101+
.await;
102+
103+
if res.is_err() && matches!(tokenizer.peek(), Some(Ok(_))) {
104+
// Remove the previous EOI.
105+
tokens.pop();
106+
// Tokenize more and try again.
107+
if tokens.len() < PROBE_INSERT_MAX_TOKENS {
108+
let iter = (&mut tokenizer)
109+
.take(tokens.len() * 2)
110+
.take_while(|token| token.is_ok())
111+
.chain(std::iter::once(Ok(Token::new_eoi(sql))))
112+
.map(|token| token.unwrap())
113+
// Make sure the tokens stream is always ended with EOI.
114+
.chain(std::iter::once(Token::new_eoi(sql)));
115+
tokens.extend(iter);
116+
} else {
117+
let iter = (&mut tokenizer)
118+
.take_while(|token| token.is_ok())
119+
.map(|token| token.unwrap())
120+
// Make sure the tokens stream is always ended with EOI.
121+
.chain(std::iter::once(Token::new_eoi(sql)));
122+
tokens.extend(iter);
123+
};
124+
} else {
125+
return res;
126+
}
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)