Skip to content

Commit e48ca07

Browse files
SkyFan2002dantengskyBohuTANG
authored
feat: support multi-table insert (#15002)
* parse insert first * fix parse * init interpreter * part of pipeline * fix bind subquery * save before clippy fix * fix clippy * add parser ut * add binder test * fmt * bind source columns as projection * make DuplicateProcessor more general * add Duplicat plan * add Shuffle plan * add several physical plan * unconditional multi table insert pipeline * add table id in CommitMeta * finish unconditional pipeline * lint * clippy * fix compile error * fix privilege check * fix typos * fix slt * add test * fix compile error * conditional insert all * support insert first * support source columns * support cast schema * support fill default and reorder * support cluster key * support multi stmt txn * make lint * fix cluster key * support change tracking * fix filter index * fix shuffle * fix compile err * support same table in multi branch * fix insert first filter * Update src/query/pipeline/core/src/pipeline.rs Co-authored-by: dantengsky <dantengsky@gmail.com> * fix wrong test file name * FIRST no need be a reserved ident * fix typos * rename symbol * remove not universal shuffle strategy * fix force finish together * peek first token first * Revert "remove not universal shuffle strategy" This reverts commit 9a6aef1. * shuffle return Result * add comments * make lint * fix compile error * split test file --------- Co-authored-by: dantengsky <dantengsky@gmail.com> Co-authored-by: Bohu <overred.shuttler@gmail.com>
1 parent 88a884e commit e48ca07

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2923
-125
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt::Display;
16+
17+
use derive_visitor::Drive;
18+
use derive_visitor::DriveMut;
19+
20+
use crate::ast::write_comma_separated_list;
21+
use crate::ast::Expr;
22+
use crate::ast::Identifier;
23+
use crate::ast::Query;
24+
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
25+
pub struct IntoClause {
26+
pub catalog: Option<Identifier>,
27+
pub database: Option<Identifier>,
28+
pub table: Identifier,
29+
pub target_columns: Vec<Identifier>,
30+
pub source_columns: Vec<Identifier>,
31+
}
32+
33+
impl Display for IntoClause {
34+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
35+
write!(f, "INTO ")?;
36+
if let Some(catalog) = &self.catalog {
37+
write!(f, "{}.", catalog)?;
38+
}
39+
if let Some(database) = &self.database {
40+
write!(f, "{}.", database)?;
41+
}
42+
write!(f, "{}", self.table)?;
43+
if !self.target_columns.is_empty() {
44+
write!(f, " (")?;
45+
write_comma_separated_list(f, &self.target_columns)?;
46+
write!(f, ")")?;
47+
}
48+
if !self.source_columns.is_empty() {
49+
write!(f, " VALUES ")?;
50+
write!(f, " (")?;
51+
write_comma_separated_list(f, &self.source_columns)?;
52+
write!(f, ")")?;
53+
}
54+
Ok(())
55+
}
56+
}
57+
58+
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
59+
pub struct WhenClause {
60+
pub condition: Expr,
61+
pub into_clauses: Vec<IntoClause>,
62+
}
63+
64+
impl Display for WhenClause {
65+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
66+
write!(f, "WHEN ")?;
67+
self.condition.fmt(f)?;
68+
write!(f, " THEN ")?;
69+
for into_clause in &self.into_clauses {
70+
write!(f, "{} ", into_clause)?;
71+
}
72+
Ok(())
73+
}
74+
}
75+
76+
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
77+
pub struct ElseClause {
78+
pub into_clauses: Vec<IntoClause>,
79+
}
80+
81+
impl Display for ElseClause {
82+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83+
write!(f, "ELSE ")?;
84+
for into_clause in &self.into_clauses {
85+
write!(f, "{} ", into_clause)?;
86+
}
87+
Ok(())
88+
}
89+
}
90+
91+
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
92+
pub struct InsertMultiTableStmt {
93+
#[drive(skip)]
94+
pub overwrite: bool,
95+
#[drive(skip)]
96+
pub is_first: bool,
97+
pub when_clauses: Vec<WhenClause>,
98+
pub else_clause: Option<ElseClause>,
99+
pub into_clauses: Vec<IntoClause>,
100+
pub source: Query,
101+
}
102+
103+
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
104+
pub enum InsertMultiTableKind {
105+
First,
106+
All,
107+
}
108+
109+
impl Display for InsertMultiTableStmt {
110+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
111+
write!(f, "INSERT ")?;
112+
if self.overwrite {
113+
write!(f, "OVERWRITE ")?;
114+
}
115+
match &self.is_first {
116+
true => write!(f, "FIRST ")?,
117+
false => write!(f, "ALL ")?,
118+
}
119+
for when in &self.when_clauses {
120+
write!(f, "{} ", when)?;
121+
}
122+
if let Some(else_clause) = &self.else_clause {
123+
write!(f, "{} ", else_clause)?;
124+
}
125+
for into_clause in &self.into_clauses {
126+
write!(f, "{} ", into_clause)?;
127+
}
128+
write!(f, "{}", self.source)
129+
}
130+
}

src/query/ast/src/ast/statements/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod explain;
2424
mod hint;
2525
mod index;
2626
mod insert;
27+
mod insert_multi_table;
2728
mod kill;
2829
mod lock;
2930
mod merge_into;
@@ -60,6 +61,7 @@ pub use explain::*;
6061
pub use hint::*;
6162
pub use index::*;
6263
pub use insert::*;
64+
pub use insert_multi_table::*;
6365
pub use kill::*;
6466
pub use lock::*;
6567
pub use merge_into::*;

src/query/ast/src/ast/statements/statement.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub enum Statement {
104104
},
105105

106106
Insert(InsertStmt),
107+
InsertMultiTable(InsertMultiTableStmt),
107108
Replace(ReplaceStmt),
108109
MergeInto(MergeIntoStmt),
109110
Delete(DeleteStmt),
@@ -415,6 +416,7 @@ impl Display for Statement {
415416
}
416417
Statement::Query(stmt) => write!(f, "{stmt}")?,
417418
Statement::Insert(stmt) => write!(f, "{stmt}")?,
419+
Statement::InsertMultiTable(insert_multi_table) => write!(f, "{insert_multi_table}")?,
418420
Statement::Replace(stmt) => write!(f, "{stmt}")?,
419421
Statement::MergeInto(stmt) => write!(f, "{stmt}")?,
420422
Statement::Delete(stmt) => write!(f, "{stmt}")?,

src/query/ast/src/ast/visitors/walk.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,5 +567,6 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
567567
Statement::Begin => {}
568568
Statement::Commit => {}
569569
Statement::Abort => {}
570+
Statement::InsertMultiTable(_) => {}
570571
}
571572
}

src/query/ast/src/ast/visitors/walk_mut.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,5 +565,6 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
565565
Statement::AlterNotification(stmt) => visitor.visit_alter_notification(stmt),
566566
Statement::DropNotification(stmt) => visitor.visit_drop_notification(stmt),
567567
Statement::DescribeNotification(stmt) => visitor.visit_describe_notification(stmt),
568+
Statement::InsertMultiTable(_) => {}
568569
}
569570
}

src/query/ast/src/parser/statement.rs

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2063,7 +2063,9 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
20632063
| #show_password_policies: "`SHOW PASSWORD POLICIES [<show_options>]`"
20642064
),
20652065
rule!(
2066-
#insert_stmt(false) : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
2066+
#conditional_multi_table_insert() : "`INSERT [OVERWRITE] {FIRST|ALL} { WHEN <condition> THEN intoClause [ ... ] } [ ... ] [ ELSE intoClause ] <subquery>`"
2067+
| #unconditional_multi_table_insert() : "`INSERT [OVERWRITE] ALL intoClause [ ... ] <subquery>`"
2068+
| #insert_stmt(false) : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
20672069
| #replace_stmt(false) : "`REPLACE INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
20682070
| #merge : "`MERGE INTO <target_table> USING <source> ON <join_expr> { matchedClause | notMatchedClause } [ ... ]`"
20692071
| #delete : "`DELETE FROM <table> [WHERE ...]`"
@@ -2290,6 +2292,89 @@ pub fn insert_stmt(allow_raw: bool) -> impl FnMut(Input) -> IResult<Statement> {
22902292
}
22912293
}
22922294

2295+
pub fn conditional_multi_table_insert() -> impl FnMut(Input) -> IResult<Statement> {
2296+
move |i| {
2297+
map(
2298+
rule! {
2299+
INSERT ~ OVERWRITE? ~ (FIRST | ALL) ~ (#when_clause)+ ~ (#else_clause)? ~ #query
2300+
},
2301+
|(_, overwrite, kind, when_clauses, opt_else, source)| {
2302+
Statement::InsertMultiTable(InsertMultiTableStmt {
2303+
overwrite: overwrite.is_some(),
2304+
is_first: matches!(kind.kind, FIRST),
2305+
when_clauses,
2306+
else_clause: opt_else,
2307+
into_clauses: vec![],
2308+
source,
2309+
})
2310+
},
2311+
)(i)
2312+
}
2313+
}
2314+
2315+
pub fn unconditional_multi_table_insert() -> impl FnMut(Input) -> IResult<Statement> {
2316+
move |i| {
2317+
map(
2318+
rule! {
2319+
INSERT ~ OVERWRITE? ~ ALL ~ (#into_clause)+ ~ #query
2320+
},
2321+
|(_, overwrite, _, into_clauses, source)| {
2322+
Statement::InsertMultiTable(InsertMultiTableStmt {
2323+
overwrite: overwrite.is_some(),
2324+
is_first: false,
2325+
when_clauses: vec![],
2326+
else_clause: None,
2327+
into_clauses,
2328+
source,
2329+
})
2330+
},
2331+
)(i)
2332+
}
2333+
}
2334+
2335+
fn when_clause(i: Input) -> IResult<WhenClause> {
2336+
map(
2337+
rule! {
2338+
WHEN ~ ^#expr ~ THEN ~ (#into_clause)+
2339+
},
2340+
|(_, expr, _, into_clauses)| WhenClause {
2341+
condition: expr,
2342+
into_clauses,
2343+
},
2344+
)(i)
2345+
}
2346+
2347+
fn into_clause(i: Input) -> IResult<IntoClause> {
2348+
map(
2349+
rule! {
2350+
INTO
2351+
~ #dot_separated_idents_1_to_3
2352+
~ ( "(" ~ #comma_separated_list1(ident) ~ ")" )?
2353+
~ (VALUES ~ "(" ~ #comma_separated_list1(ident) ~ ")" )?
2354+
},
2355+
|(_, (catalog, database, table), opt_target_columns, opt_source_columns)| IntoClause {
2356+
catalog,
2357+
database,
2358+
table,
2359+
target_columns: opt_target_columns
2360+
.map(|(_, columns, _)| columns)
2361+
.unwrap_or_default(),
2362+
source_columns: opt_source_columns
2363+
.map(|(_, _, columns, _)| columns)
2364+
.unwrap_or_default(),
2365+
},
2366+
)(i)
2367+
}
2368+
2369+
fn else_clause(i: Input) -> IResult<ElseClause> {
2370+
map(
2371+
rule! {
2372+
ELSE ~ (#into_clause)+
2373+
},
2374+
|(_, into_clauses)| ElseClause { into_clauses },
2375+
)(i)
2376+
}
2377+
22932378
pub fn replace_stmt(allow_raw: bool) -> impl FnMut(Input) -> IResult<Statement> {
22942379
move |i| {
22952380
let insert_source_parser = if allow_raw {

src/query/ast/tests/it/display.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_ast::parser::parse_sql;
16+
use databend_common_ast::parser::tokenize_sql;
17+
use databend_common_ast::parser::Dialect;
18+
19+
fn test_stmt_display(sql: &str) {
20+
let tokens = tokenize_sql(sql).unwrap();
21+
let (stmt, _) = parse_sql(&tokens, Dialect::PostgreSQL).unwrap();
22+
let sql1 = stmt.to_string();
23+
let tokens1 = tokenize_sql(&sql1).unwrap();
24+
let (stmt1, _) = parse_sql(&tokens1, Dialect::PostgreSQL).unwrap();
25+
let sql2 = stmt1.to_string();
26+
assert_eq!(sql1, sql2);
27+
}
28+
29+
#[test]
30+
fn test_multi_table_insert_display() {
31+
const SQL_FILE_PATH: &str = "tests/it/testsql/multi_table_insert.sql";
32+
let sqls = std::fs::read_to_string(SQL_FILE_PATH).unwrap();
33+
for sql in sqls.split(';').filter(|s| !s.is_empty()) {
34+
test_stmt_display(sql);
35+
}
36+
}
37+
38+
#[test]
39+
fn test_multi_table_insert_parse_error() {
40+
const SQL_FILE_PATH: &str = "tests/it/testsql/multi_table_insert_error.sql";
41+
let sqls = std::fs::read_to_string(SQL_FILE_PATH).unwrap();
42+
for sql in sqls.split(';').filter(|s| !s.is_empty()) {
43+
let tokens = tokenize_sql(sql).unwrap();
44+
assert!(parse_sql(&tokens, Dialect::PostgreSQL).is_err());
45+
}
46+
}

src/query/ast/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
#![allow(clippy::uninlined_format_args)]
1616

1717
mod decimal;
18+
mod display;
1819
mod parser;
1920
mod token;
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
INSERT FIRST
2+
WHEN n1 > 100 THEN
3+
INTO t1
4+
WHEN n1 > 10 THEN
5+
INTO t1
6+
INTO t2
7+
ELSE
8+
INTO t2
9+
SELECT n1 from src;
10+
11+
INSERT OVERWRITE FIRST
12+
WHEN n1 > 100 THEN
13+
INTO t1
14+
WHEN n1 > 10 THEN
15+
INTO t1
16+
INTO t2
17+
ELSE
18+
INTO t2
19+
SELECT n1 from src;
20+
21+
INSERT OVERWRITE ALL
22+
WHEN n1 > 100 THEN
23+
INTO t1
24+
WHEN n1 > 10 THEN
25+
INTO t1
26+
INTO t2
27+
ELSE
28+
INTO t2
29+
SELECT n1 from src;
30+
31+
INSERT OVERWRITE ALL
32+
INTO t1
33+
INTO t2
34+
SELECT n1 from src;
35+
36+
INSERT OVERWRITE FIRST
37+
WHEN n1 > 100 THEN
38+
INTO t1
39+
WHEN n1 > 10 THEN
40+
INTO t1
41+
INTO t2
42+
SELECT n1 from src;
43+
44+
INSERT OVERWRITE ALL
45+
WHEN n1 > 100 THEN
46+
INTO t1
47+
WHEN n1 > 10 THEN
48+
INTO t1
49+
INTO t2
50+
SELECT n1 from src;
51+
52+
INSERT ALL
53+
INTO t1
54+
INTO t1 (c1, c2, c3) VALUES (n2, n1, n3)
55+
INTO t2 (c1, c2, c3)
56+
INTO t2 VALUES (n3, n2, n1)
57+
SELECT n1, n2, n3 from src;
58+
59+
INSERT OVERWRITE ALL
60+
INTO t1
61+
INTO t1 (c1, c2, c3) VALUES (n2, n1, n3)
62+
INTO t2 (c1, c2, c3)
63+
INTO t2 VALUES (n3, n2, n1)
64+
SELECT n1, n2, n3 from src;

0 commit comments

Comments
 (0)