Skip to content

Commit 69f82ef

Browse files
authored
Merge pull request #7530 from andylokandy/no_start
refactor(query): replace recursion for fast-path insert with loop
2 parents d3aeeef + 14590cc commit 69f82ef

File tree

9 files changed

+91
-132
lines changed

9 files changed

+91
-132
lines changed

src/query/ast/src/ast/format/syntax/dml.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,15 @@ pub(crate) fn pretty_insert(insert_stmt: InsertStmt) -> RcDoc {
6969

7070
fn pretty_source(source: InsertSource) -> RcDoc {
7171
RcDoc::line().append(match source {
72-
InsertSource::Streaming {
73-
format, rest_str, ..
74-
} => RcDoc::text("FORMAT")
72+
InsertSource::Streaming { format, rest_str } => RcDoc::text("FORMAT")
7573
.append(RcDoc::space())
7674
.append(RcDoc::text(format))
7775
.append(
7876
RcDoc::line()
7977
.nest(NEST_FACTOR)
8078
.append(RcDoc::text(rest_str.to_string())),
8179
),
82-
InsertSource::Values { rest_str, .. } => RcDoc::text("VALUES").append(
80+
InsertSource::Values { rest_str } => RcDoc::text("VALUES").append(
8381
RcDoc::line()
8482
.nest(NEST_FACTOR)
8583
.append(RcDoc::text(rest_str.to_string())),

src/query/ast/src/ast/statements/insert.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -56,29 +56,18 @@ impl Display for InsertStmt<'_> {
5656

5757
#[derive(Debug, Clone, PartialEq)]
5858
pub enum InsertSource<'a> {
59-
Streaming {
60-
format: String,
61-
start: usize,
62-
rest_str: &'a str,
63-
},
64-
Values {
65-
start: usize,
66-
rest_str: &'a str,
67-
},
68-
Select {
69-
query: Box<Query<'a>>,
70-
},
59+
Streaming { format: String, rest_str: &'a str },
60+
Values { rest_str: &'a str },
61+
Select { query: Box<Query<'a>> },
7162
}
7263

7364
impl Display for InsertSource<'_> {
7465
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
7566
match self {
76-
InsertSource::Streaming {
77-
format, rest_str, ..
78-
} => {
67+
InsertSource::Streaming { format, rest_str } => {
7968
write!(f, "FORMAT {format} {rest_str}")
8069
}
81-
InsertSource::Values { rest_str, .. } => write!(f, "VALUES {rest_str}"),
70+
InsertSource::Values { rest_str } => write!(f, "VALUES {rest_str}"),
8271
InsertSource::Select { query } => write!(f, "{query}"),
8372
}
8473
}

src/query/ast/src/parser/statement.rs

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -978,32 +978,18 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
978978
pub fn insert_source(i: Input) -> IResult<InsertSource> {
979979
let streaming = map(
980980
rule! {
981-
FORMAT ~ #ident ~ #rest_tokens
981+
FORMAT ~ #ident ~ #rest_str
982982
},
983-
|(_, format, rest_tokens)| {
984-
let rest_str = &rest_tokens[0].source
985-
[rest_tokens.first().unwrap().span.start..rest_tokens.last().unwrap().span.end];
986-
987-
InsertSource::Streaming {
988-
format: format.name,
989-
start: rest_tokens.first().unwrap().span.start,
990-
rest_str,
991-
}
983+
|(_, format, rest_str)| InsertSource::Streaming {
984+
format: format.name,
985+
rest_str,
992986
},
993987
);
994988
let values = map(
995989
rule! {
996-
VALUES ~ #rest_tokens
997-
},
998-
|(_, rest_tokens)| {
999-
let rest_str = &rest_tokens[0].source
1000-
[rest_tokens.first().unwrap().span.start..rest_tokens.last().unwrap().span.end];
1001-
1002-
InsertSource::Values {
1003-
rest_str,
1004-
start: rest_tokens.first().unwrap().span.start,
1005-
}
990+
VALUES ~ #rest_str
1006991
},
992+
|(_, rest_str)| InsertSource::Values { rest_str },
1007993
);
1008994
let query = map(query, |query| InsertSource::Select {
1009995
query: Box::new(query),
@@ -1016,12 +1002,14 @@ pub fn insert_source(i: Input) -> IResult<InsertSource> {
10161002
)(i)
10171003
}
10181004

1019-
pub fn rest_tokens<'a>(i: Input<'a>) -> IResult<&'a [Token]> {
1020-
if i.last().map(|token| token.kind) == Some(EOI) {
1021-
Ok((i.slice(i.len() - 1..), i.slice(..i.len() - 1).0))
1022-
} else {
1023-
Ok((i.slice(i.len()..), i.0))
1024-
}
1005+
pub fn rest_str<'a>(i: Input<'a>) -> IResult<&'a str> {
1006+
// It's safe to unwrap because input must contain EOI.
1007+
let first_token = i.0.first().unwrap();
1008+
let last_token = i.0.last().unwrap();
1009+
Ok((
1010+
i.slice((i.len() - 1)..),
1011+
&first_token.source[first_token.span.start..last_token.span.end],
1012+
))
10251013
}
10261014

10271015
pub fn column_def(i: Input) -> IResult<ColumnDefinition> {

src/query/ast/src/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ where
344344
let span = iter_cloned
345345
.nth(len - iter.len() - 1)
346346
.map(|elem| elem.span)
347-
// It's safe to slice one more token because EOI is always added.
347+
// It's safe to slice one more token because input must contain EOI.
348348
.unwrap_or_else(|| rest.slice(..1));
349349

350350
nom::Err::Error(Error::from_error_kind(span, err_kind))

src/query/ast/tests/it/testdata/statement.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4335,7 +4335,6 @@ Insert(
43354335
},
43364336
],
43374337
source: Values {
4338-
start: 30,
43394338
rest_str: "(1, 2), (3, 4);",
43404339
},
43414340
overwrite: false,
@@ -4360,7 +4359,6 @@ Insert(
43604359
columns: [],
43614360
source: Streaming {
43624361
format: "json",
4363-
start: 31,
43644362
rest_str: ";",
43654363
},
43664364
overwrite: false,

src/query/service/src/sql/planner/binder/insert.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,7 @@ impl<'a> Binder {
108108
};
109109

110110
let input_source: Result<InsertInputSource> = match source.clone() {
111-
InsertSource::Streaming {
112-
format, rest_str, ..
113-
} => {
111+
InsertSource::Streaming { format, rest_str } => {
114112
self.analyze_stream_format(bind_context, rest_str, Some(format), schema.clone())
115113
.await
116114
}

src/query/service/src/sql/planner/mod.rs

Lines changed: 65 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@
1414

1515
use std::sync::Arc;
1616

17-
use common_ast::ast::InsertSource;
18-
use common_ast::ast::Statement;
1917
use common_ast::parser::parse_sql;
18+
use common_ast::parser::token::Token;
2019
use common_ast::parser::token::TokenKind;
2120
use common_ast::parser::token::Tokenizer;
22-
use common_ast::parser::tokenize_sql;
2321
use common_ast::Backtrace;
2422
use common_exception::Result;
2523
use parking_lot::RwLock;
@@ -54,7 +52,8 @@ use super::optimizer::OptimizerConfig;
5452
use super::optimizer::OptimizerContext;
5553
use crate::sessions::TableContext;
5654

57-
static MAX_TOKEN_FOR_INSERT: usize = 128;
55+
const PROBE_INSERT_INITIAL_TOKENS: usize = 128;
56+
const PROBE_INSERT_MAX_TOKENS: usize = 128 * 8;
5857

5958
pub struct Planner {
6059
ctx: Arc<QueryContext>,
@@ -66,86 +65,73 @@ impl Planner {
6665
}
6766

6867
pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, MetadataRef, Option<String>)> {
69-
let mut tokenizer = Tokenizer::new(sql);
70-
if let Some(Ok(t)) = tokenizer.next() {
71-
if t.kind == TokenKind::INSERT {
72-
if let Some(Ok(last_token)) = tokenizer.take(MAX_TOKEN_FOR_INSERT).last() {
73-
match self.plan_sql_inner(sql, Some(last_token.span.start)).await {
74-
Ok(v) => return Ok(v),
75-
Err(err) => {
76-
tracing::warn!(
77-
"Faster parser path for insert failed: {}, start to fallback",
78-
err
79-
);
80-
}
81-
}
82-
}
83-
}
84-
}
85-
// fallback to normal plan_sql_inner
86-
self.plan_sql_inner(sql, None).await
87-
}
88-
89-
#[async_recursion::async_recursion]
90-
async fn plan_sql_inner(
91-
&mut self,
92-
sql: &str,
93-
last_token_index: Option<usize>,
94-
) -> Result<(Plan, MetadataRef, Option<String>)> {
9568
let settings = self.ctx.get_settings();
9669
let sql_dialect = settings.get_sql_dialect()?;
9770

98-
// Step 1: parse SQL text into AST
99-
// If last_token_index is set, we will try to parse from the short_sql.
100-
// If any error happens, it will fallback to default parser logic.
101-
let short_sql = last_token_index.map(|index| &sql[..index]).unwrap_or(sql);
102-
let tokens = tokenize_sql(short_sql)?;
103-
let backtrace = Backtrace::new();
104-
105-
let (mut stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?;
106-
if last_token_index.is_some() {
107-
let mut should_fallback = true;
108-
if let Statement::Insert(ref mut insert) = stmt {
109-
match &mut insert.source {
110-
InsertSource::Streaming {
111-
start, rest_str, ..
112-
} => {
113-
*rest_str = &sql[*start..];
114-
should_fallback = false;
115-
}
116-
117-
InsertSource::Values {
118-
start, rest_str, ..
119-
} => {
120-
*rest_str = &sql[*start..];
121-
should_fallback = false;
122-
}
123-
_ => {}
124-
}
71+
// Step 1: Tokenize the SQL.
72+
let mut tokenizer = Tokenizer::new(sql).peekable();
73+
let is_insert_stmt = tokenizer
74+
.peek()
75+
.and_then(|token| Some(token.as_ref().ok()?.kind))
76+
== Some(TokenKind::INSERT);
77+
// Only tokenize the beginning tokens for `INSERT INTO` statement because it's unnecessary to tokenize tokens for values.
78+
//
79+
// Stop the tokenizer on unrecognized token because some values inputs (e.g. CSV) may not be recognized by the tokenizer.
80+
// See also: https://github.com/datafuselabs/databend/issues/6669
81+
let mut tokens: Vec<Token> = if is_insert_stmt {
82+
(&mut tokenizer)
83+
.take(PROBE_INSERT_INITIAL_TOKENS)
84+
.take_while(|token| token.is_ok())
85+
.collect::<Result<_>>()
86+
.unwrap()
87+
} else {
88+
(&mut tokenizer).collect::<Result<_>>()?
89+
};
90+
91+
loop {
92+
let res = async {
93+
// Step 2: Parse the SQL.
94+
let backtrace = Backtrace::new();
95+
let (stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?;
96+
97+
// Step 3: Bind AST with catalog, and generate a pure logical SExpr
98+
let metadata = Arc::new(RwLock::new(Metadata::create()));
99+
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
100+
let binder = Binder::new(
101+
self.ctx.clone(),
102+
self.ctx.get_catalog_manager()?,
103+
name_resolution_ctx,
104+
metadata.clone(),
105+
);
106+
let plan = binder.bind(&stmt).await?;
107+
108+
// Step 4: Optimize the SExpr with optimizers, and generate optimized physical SExpr
109+
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
110+
enable_distributed_optimization: !self.ctx.get_cluster().is_empty(),
111+
}));
112+
let optimized_plan = optimize(self.ctx.clone(), opt_ctx, plan)?;
113+
114+
Ok((optimized_plan, metadata.clone(), format))
125115
}
126-
// fallback to normal plan_sql_inner
127-
if should_fallback {
128-
return self.plan_sql_inner(sql, None).await;
116+
.await;
117+
118+
if res.is_err() && matches!(tokenizer.peek(), Some(Ok(_))) {
119+
// Tokenize more and try again.
120+
if tokens.len() < PROBE_INSERT_MAX_TOKENS {
121+
let iter = (&mut tokenizer)
122+
.take(tokens.len() * 2)
123+
.take_while(|token| token.is_ok())
124+
.map(|token| token.unwrap());
125+
tokens.extend(iter);
126+
} else {
127+
let iter = (&mut tokenizer)
128+
.take_while(|token| token.is_ok())
129+
.map(|token| token.unwrap());
130+
tokens.extend(iter);
131+
};
132+
} else {
133+
return res;
129134
}
130135
}
131-
132-
// Step 2: bind AST with catalog, and generate a pure logical SExpr
133-
let metadata = Arc::new(RwLock::new(Metadata::create()));
134-
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
135-
let binder = Binder::new(
136-
self.ctx.clone(),
137-
self.ctx.get_catalog_manager()?,
138-
name_resolution_ctx,
139-
metadata.clone(),
140-
);
141-
let plan = binder.bind(&stmt).await?;
142-
143-
// Step 3: optimize the SExpr with optimizers, and generate optimized physical SExpr
144-
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
145-
enable_distributed_optimization: !self.ctx.get_cluster().is_empty(),
146-
}));
147-
let optimized_plan = optimize(self.ctx.clone(), opt_ctx, plan)?;
148-
149-
Ok((optimized_plan, metadata.clone(), format))
150136
}
151137
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
---bigint
22
5632622125792883430
33
---escape
4+
"ac "bc
45
1a\nb c\td
56
2a b cd

tests/suites/0_stateless/14_clickhouse_http_handler/14_0008_tsv_input_format.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ insert into t1(a) format TSV 5632622125792883430
88
EOF
99

1010
cat << EOF > /tmp/databend_test_tsv_escape.txt
11-
insert into t2(a, b) format TSV 1a\nb c\td
11+
insert into t2(a, b) format TSV "ac "bc
12+
1a\nb c\td
1213
2a\x20b c\Nd
1314
EOF
1415

0 commit comments

Comments
 (0)