Skip to content

Commit 8633f73

Browse files
authored
feat: adjust syntax of streaming load insert SQL. (#18075)
1 parent dda7fb1 commit 8633f73

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+117
-59
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/bendpy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ crate-type = ["cdylib"]
1818
arrow = { workspace = true, features = ["pyarrow"] }
1919
arrow-schema = { workspace = true }
2020
ctor = { workspace = true }
21+
databend-common-catalog = { workspace = true }
2122
databend-common-config = { workspace = true }
2223
databend-common-exception = { workspace = true }
2324
databend-common-expression = { workspace = true }

src/bendpy/src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_catalog::session_type::SessionType;
1718
use databend_common_config::GlobalConfig;
1819
use databend_common_exception::Result;
1920
use databend_common_meta_app::principal::GrantObject;
@@ -24,7 +25,6 @@ use databend_common_users::UserApiProvider;
2425
use databend_query::sessions::QueryContext;
2526
use databend_query::sessions::Session;
2627
use databend_query::sessions::SessionManager;
27-
use databend_query::sessions::SessionType;
2828
use databend_query::sql::Planner;
2929
use pyo3::prelude::*;
3030

src/bendsave/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition = "2021"
88

99
[dependencies]
1010
databend-common-base = { workspace = true }
11+
databend-common-catalog = { workspace = true }
1112
databend-common-config = { workspace = true }
1213
databend-common-license = { workspace = true }
1314
databend-common-meta-client = { workspace = true }

src/bendsave/src/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use bytes::Bytes;
2424
use bytes::BytesMut;
2525
use databend_common_base::base::GlobalInstance;
2626
use databend_common_base::runtime::GlobalIORuntime;
27+
use databend_common_catalog::session_type::SessionType;
2728
use databend_common_config::Config;
2829
use databend_common_config::GlobalConfig;
2930
use databend_common_config::InnerConfig;
@@ -38,7 +39,6 @@ use databend_common_users::builtin::BuiltIn;
3839
use databend_common_users::UserApiProvider;
3940
use databend_enterprise_query::license::RealLicenseManager;
4041
use databend_query::sessions::SessionManager;
41-
use databend_query::sessions::SessionType;
4242
use futures::TryStream;
4343
use futures::TryStreamExt;
4444
use log::debug;

src/query/ast/src/ast/statements/insert.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub enum InsertSource {
8585
StreamingLoad {
8686
format_options: FileFormatOptions,
8787
on_error_mode: Option<String>,
88+
value: Option<Vec<Expr>>,
8889
},
8990
}
9091

@@ -106,9 +107,15 @@ impl Display for InsertSource {
106107
InsertSource::RawValues { rest_str, .. } => write!(f, "VALUES {rest_str}"),
107108
InsertSource::Select { query } => write!(f, "{query}"),
108109
InsertSource::StreamingLoad {
110+
value,
109111
format_options,
110112
on_error_mode,
111113
} => {
114+
if let Some(value) = value {
115+
write!(f, "(")?;
116+
write_comma_separated_list(f, value)?;
117+
write!(f, ")")?;
118+
}
112119
write!(f, " FILE_FORMAT = ({})", format_options)?;
113120
write!(
114121
f,

src/query/ast/src/parser/parser.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,17 @@ pub fn parse_values(tokens: &[Token], dialect: Dialect) -> Result<Vec<Expr>> {
8181
run_parser(tokens, dialect, ParseMode::Default, false, values)
8282
}
8383

84-
pub fn parse_raw_insert_stmt(tokens: &[Token], dialect: Dialect) -> Result<Statement> {
84+
pub fn parse_raw_insert_stmt(
85+
tokens: &[Token],
86+
dialect: Dialect,
87+
in_streaming_load: bool,
88+
) -> Result<Statement> {
8589
run_parser(
8690
tokens,
8791
dialect,
8892
ParseMode::Default,
8993
false,
90-
insert_stmt(true),
94+
insert_stmt(true, in_streaming_load),
9195
)
9296
}
9397

src/query/ast/src/parser/statement.rs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2605,7 +2605,7 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
26052605
rule!(
26062606
#conditional_multi_table_insert() : "`INSERT [OVERWRITE] {FIRST|ALL} { WHEN <condition> THEN intoClause [ ... ] } [ ... ] [ ELSE intoClause ] <subquery>`"
26072607
| #unconditional_multi_table_insert() : "`INSERT [OVERWRITE] ALL intoClause [ ... ] <subquery>`"
2608-
| #insert_stmt(false) : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
2608+
| #insert_stmt(false, false) : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (VALUES <values> | <query>)`"
26092609
| #replace_stmt(false) : "`REPLACE INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
26102610
| #merge : "`MERGE INTO <target_table> USING <source> ON <join_expr> { matchedClause | notMatchedClause } [ ... ]`"
26112611
| #delete : "`DELETE FROM <table> [WHERE ...]`"
@@ -2798,10 +2798,15 @@ pub fn parse_create_option(
27982798
}
27992799
}
28002800

2801-
pub fn insert_stmt(allow_raw: bool) -> impl FnMut(Input) -> IResult<Statement> {
2801+
pub fn insert_stmt(
2802+
allow_raw: bool,
2803+
in_streaming_load: bool,
2804+
) -> impl FnMut(Input) -> IResult<Statement> {
28022805
move |i| {
2803-
let insert_source_parser = if allow_raw {
2804-
raw_insert_source
2806+
let insert_source_parser = if in_streaming_load {
2807+
insert_source_file
2808+
} else if allow_raw {
2809+
insert_source_fast_values
28052810
} else {
28062811
insert_source
28072812
};
@@ -2935,7 +2940,7 @@ fn else_clause(i: Input) -> IResult<ElseClause> {
29352940
pub fn replace_stmt(allow_raw: bool) -> impl FnMut(Input) -> IResult<Statement> {
29362941
move |i| {
29372942
let insert_source_parser = if allow_raw {
2938-
raw_insert_source
2943+
insert_source_fast_values
29392944
} else {
29402945
insert_source
29412946
};
@@ -3005,20 +3010,38 @@ pub fn insert_source(i: Input) -> IResult<InsertSource> {
30053010
)(i)
30063011
}
30073012

3008-
// `INSERT INTO ... VALUES` statement will
3009-
// stop the parser immediately and return the rest tokens in `InsertSource`.
3010-
//
3011-
// This is a hack to parse large insert statements.
3012-
pub fn raw_insert_source(i: Input) -> IResult<InsertSource> {
3013-
let streaming = map(
3013+
pub fn insert_source_file(i: Input) -> IResult<InsertSource> {
3014+
let value = map(
30143015
rule! {
3015-
#file_format_clause ~ (ON_ERROR ~ ^"=" ~ ^#ident)?
3016+
"(" ~ #comma_separated_list1(expr) ~ ")"
30163017
},
3017-
|(options, on_error_opt)| InsertSource::StreamingLoad {
3018+
|(_, values, _)| values,
3019+
);
3020+
map(
3021+
rule! {
3022+
VALUES ~ #value? ~ #file_format_clause ~ (ON_ERROR ~ ^"=" ~ ^#ident)?
3023+
},
3024+
|(_, value, options, on_error_opt)| InsertSource::StreamingLoad {
30183025
format_options: options,
30193026
on_error_mode: on_error_opt.map(|v| v.2.to_string()),
3027+
value,
30203028
},
3021-
);
3029+
)(i)
3030+
// TODO: support query later
3031+
// let query = map(query, |query| InsertSource::Select {
3032+
// query: Box::new(query),
3033+
// });
3034+
// rule!(
3035+
// #file
3036+
// | #query
3037+
// )(i)
3038+
}
3039+
3040+
// `INSERT INTO ... VALUES` statement will
3041+
// stop the parser immediately and return the rest tokens in `InsertSource`.
3042+
//
3043+
// This is a hack to parse large insert statements.
3044+
pub fn insert_source_fast_values(i: Input) -> IResult<InsertSource> {
30223045
let values = map(
30233046
rule! {
30243047
VALUES ~ #rest_str
@@ -3037,7 +3060,6 @@ pub fn raw_insert_source(i: Input) -> IResult<InsertSource> {
30373060
rule!(
30383061
#values
30393062
| #query
3040-
| #streaming
30413063
)(i)
30423064
}
30433065

src/query/ast/tests/it/parser.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ fn test_raw_insert_stmt() {
11021102
];
11031103

11041104
for case in cases {
1105-
run_parser(file, insert_stmt(true), case);
1105+
run_parser(file, insert_stmt(true, false), case);
11061106
}
11071107
}
11081108

src/query/ast/tests/it/testdata/stmt-error.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ error:
133133
| ^^^^^^
134134
| |
135135
| INSERT statement must be followed by 'overwrite' or 'into'
136-
| while parsing `INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`
136+
| while parsing `INSERT INTO [TABLE] <table> [(<column>, ...)] (VALUES <values> | <query>)`
137137

138138

139139
---------- Input ----------
@@ -167,7 +167,7 @@ error:
167167
1 | insert into t format
168168
| ------ ^^^^^^ unexpected `format`, expecting `FROM`, `ORDER`, `LIMIT`, `OFFSET`, `IGNORE_RESULT`, `WITH`, `VALUES`, `EXCEPT`, `SELECT`, `INTERSECT`, `(`, `UNION`, or `.`
169169
| |
170-
| while parsing `INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`
170+
| while parsing `INSERT INTO [TABLE] <table> [(<column>, ...)] (VALUES <values> | <query>)`
171171

172172

173173
---------- Input ----------

0 commit comments

Comments
 (0)