Skip to content

Commit 8cccb77

Browse files
committed
add comment
1 parent fea06da commit 8cccb77

File tree

6 files changed

+52
-67
lines changed

6 files changed

+52
-67
lines changed

src/query/ast/src/parser/statement.rs

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -978,28 +978,18 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
978978
pub fn insert_source(i: Input) -> IResult<InsertSource> {
979979
let streaming = map(
980980
rule! {
981-
FORMAT ~ #ident ~ #rest_tokens
981+
FORMAT ~ #ident ~ #rest_str
982982
},
983-
|(_, format, rest_tokens)| {
984-
let rest_str = &rest_tokens[0].source
985-
[rest_tokens.first().unwrap().span.start..rest_tokens.last().unwrap().span.end];
986-
987-
InsertSource::Streaming {
988-
format: format.name,
989-
rest_str,
990-
}
983+
|(_, format, rest_str)| InsertSource::Streaming {
984+
format: format.name,
985+
rest_str,
991986
},
992987
);
993988
let values = map(
994989
rule! {
995-
VALUES ~ #rest_tokens
996-
},
997-
|(_, rest_tokens)| {
998-
let rest_str = &rest_tokens[0].source
999-
[rest_tokens.first().unwrap().span.start..rest_tokens.last().unwrap().span.end];
1000-
1001-
InsertSource::Values { rest_str }
990+
VALUES ~ #rest_str
1002991
},
992+
|(_, rest_str)| InsertSource::Values { rest_str },
1003993
);
1004994
let query = map(query, |query| InsertSource::Select {
1005995
query: Box::new(query),
@@ -1012,12 +1002,14 @@ pub fn insert_source(i: Input) -> IResult<InsertSource> {
10121002
)(i)
10131003
}
10141004

1015-
pub fn rest_tokens<'a>(i: Input<'a>) -> IResult<&'a [Token]> {
1016-
if i.last().map(|token| token.kind) == Some(EOI) {
1017-
Ok((i.slice(i.len() - 1..), i.slice(..i.len() - 1).0))
1018-
} else {
1019-
Ok((i.slice(i.len()..), i.0))
1020-
}
1005+
pub fn rest_str<'a>(i: Input<'a>) -> IResult<&'a str> {
1006+
// It's safe to unwrap because input must contain EOI.
1007+
let first_token = i.0.first().unwrap();
1008+
let last_token = i.0.last().unwrap();
1009+
Ok((
1010+
i.slice((i.len() - 1)..),
1011+
&first_token.source[first_token.span.start..last_token.span.end],
1012+
))
10211013
}
10221014

10231015
pub fn column_def(i: Input) -> IResult<ColumnDefinition> {

src/query/ast/src/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ where
344344
let span = iter_cloned
345345
.nth(len - iter.len() - 1)
346346
.map(|elem| elem.span)
347-
// It's safe to slice one more token because EOI is always added.
347+
// It's safe to slice one more token because input must contain EOI.
348348
.unwrap_or_else(|| rest.slice(..1));
349349

350350
nom::Err::Error(Error::from_error_kind(span, err_kind))

src/query/service/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#![feature(option_get_or_insert_default)]
2525
#![feature(result_option_inspect)]
2626
#![feature(is_some_with)]
27-
#![feature(try_blocks)]
2827

2928
extern crate core;
3029

src/query/service/src/sql/planner/mod.rs

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,12 @@
1414

1515
use std::sync::Arc;
1616

17-
use common_ast::ast::InsertSource;
18-
use common_ast::ast::InsertStmt;
19-
use common_ast::ast::Statement;
2017
use common_ast::parser::parse_sql;
2118
use common_ast::parser::token::Token;
2219
use common_ast::parser::token::TokenKind;
2320
use common_ast::parser::token::Tokenizer;
2421
use common_ast::Backtrace;
2522
use common_exception::Result;
26-
use nom::Offset;
2723
use parking_lot::RwLock;
2824
pub use plans::ScalarExpr;
2925

@@ -56,7 +52,8 @@ use super::optimizer::OptimizerConfig;
5652
use super::optimizer::OptimizerContext;
5753
use crate::sessions::TableContext;
5854

59-
static PROBE_INSERT_MAX_TOKENS: usize = 128;
55+
const PROBE_INSERT_INITIAL_TOKENS: usize = 128;
56+
const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8;
6057

6158
pub struct Planner {
6259
ctx: Arc<QueryContext>,
@@ -71,40 +68,28 @@ impl Planner {
7168
let settings = self.ctx.get_settings();
7269
let sql_dialect = settings.get_sql_dialect()?;
7370

74-
// Step 1: Tokenize the beginning of the SQL, so as to find `INSERT INTO` statement without
75-
// tokenizing all SQL.
76-
let mut tokenizer = Tokenizer::new(sql);
77-
let mut tokens: Vec<Token> = (&mut tokenizer)
78-
.take(PROBE_INSERT_MAX_TOKENS)
79-
.collect::<Result<_>>()?;
80-
81-
let mut try_fast_parse_insert =
82-
tokens.first().map(|token| token.kind) == Some(TokenKind::INSERT);
71+
// Step 1: Tokenize the SQL.
72+
let mut tokenizer = Tokenizer::new(sql).peekable();
73+
let is_insert_stmt = tokenizer
74+
.peek()
75+
.and_then(|token| Some(token.as_ref().ok()?.kind))
76+
== Some(TokenKind::INSERT);
77+
// Only tokenize the beginning tokens for `INSERT INTO` statement because it's unnecessary to tokenize tokens for values.
78+
let mut tokens: Vec<Token> = if is_insert_stmt {
79+
(&mut tokenizer)
80+
.take(PROBE_INSERT_INITIAL_TOKENS)
81+
.collect::<Result<_>>()?
82+
} else {
83+
(&mut tokenizer).collect::<Result<_>>()?
84+
};
8385

8486
loop {
85-
let res: Result<(Plan, MetadataRef, Option<String>)> = try {
87+
let res = async {
88+
// Step 2: Parse the SQL.
8689
let backtrace = Backtrace::new();
87-
let (stmt, format) = if try_fast_parse_insert {
88-
let (mut stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?;
89-
// Extend rest_str to the end of the SQL.
90-
if let Statement::Insert(InsertStmt {
91-
source:
92-
InsertSource::Streaming { rest_str, .. } | InsertSource::Values { rest_str },
93-
..
94-
}) = &mut stmt
95-
{
96-
*rest_str = &sql[sql.offset(rest_str)..];
97-
}
98-
(stmt, format)
99-
} else {
100-
// Fall back to tokenize all SQL.
101-
for token in &mut tokenizer {
102-
tokens.push(token?);
103-
}
104-
parse_sql(&tokens, sql_dialect, &backtrace)?
105-
};
106-
107-
// Step 2: bind AST with catalog, and generate a pure logical SExpr
90+
let (stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?;
91+
92+
// Step 3: Bind AST with catalog, and generate a pure logical SExpr
10893
let metadata = Arc::new(RwLock::new(Metadata::create()));
10994
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
11095
let binder = Binder::new(
@@ -115,17 +100,24 @@ impl Planner {
115100
);
116101
let plan = binder.bind(&stmt).await?;
117102

118-
// Step 3: optimize the SExpr with optimizers, and generate optimized physical SExpr
103+
// Step 4: Optimize the SExpr with optimizers, and generate optimized physical SExpr
119104
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
120105
enable_distributed_optimization: !self.ctx.get_cluster().is_empty(),
121106
}));
122107
let optimized_plan = optimize(self.ctx.clone(), opt_ctx, plan)?;
123108

124-
(optimized_plan, metadata.clone(), format)
125-
};
126-
127-
if try_fast_parse_insert && res.is_err() {
128-
try_fast_parse_insert = false;
109+
Ok((optimized_plan, metadata.clone(), format))
110+
}
111+
.await;
112+
113+
if res.is_err()
114+
&& tokens.len() < PROBE_INSERT_MAX_TOKENS
115+
&& matches!(tokenizer.peek(), Some(Ok(_)))
116+
{
117+
// Tokenize more and try again.
118+
for token in (&mut tokenizer).take(tokens.len() * 2) {
119+
tokens.push(token?);
120+
}
129121
} else {
130122
return res;
131123
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
---bigint
22
5632622125792883430
33
---escape
4+
"ac "bc
45
1a\nb c\td
56
2a b cd

tests/suites/0_stateless/14_clickhouse_http_handler/14_0008_tsv_input_format.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ insert into t1(a) format TSV 5632622125792883430
88
EOF
99

1010
cat << EOF > /tmp/databend_test_tsv_escape.txt
11-
insert into t2(a, b) format TSV 1a\nb c\td
11+
insert into t2(a, b) format TSV "ac "bc
12+
1a\nb c\td
1213
2a\x20b c\Nd
1314
EOF
1415

0 commit comments

Comments
 (0)