Skip to content

Commit 4a069ae

Browse files
authored
feat: insert with stage support from and file_format clause. (#18256)
* feat: insert with stage support from and file_format clause. * update tests.
1 parent 355a082 commit 4a069ae

19 files changed

+240
-98
lines changed

src/query/ast/src/ast/statements/insert.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,13 @@ pub enum InsertSource {
8282
Select {
8383
query: Box<Query>,
8484
},
85-
StreamingLoad {
85+
LoadFile {
8686
format_options: FileFormatOptions,
87-
on_error_mode: Option<String>,
8887
value: Option<Vec<Expr>>,
88+
89+
// '_databend_upload' => read from streaming upload handler body
90+
// _ => read from stage
91+
location: String,
8992
},
9093
}
9194

@@ -106,22 +109,19 @@ impl Display for InsertSource {
106109
}
107110
InsertSource::RawValues { rest_str, .. } => write!(f, "VALUES {rest_str}"),
108111
InsertSource::Select { query } => write!(f, "{query}"),
109-
InsertSource::StreamingLoad {
112+
InsertSource::LoadFile {
110113
value,
111114
format_options,
112-
on_error_mode,
115+
location,
113116
} => {
114117
if let Some(value) = value {
115118
write!(f, "(")?;
116119
write_comma_separated_list(f, value)?;
117120
write!(f, ")")?;
118121
}
122+
write!(f, " FROM @{location}",)?;
119123
write!(f, " FILE_FORMAT = ({})", format_options)?;
120-
write!(
121-
f,
122-
" ON_ERROR = '{}'",
123-
on_error_mode.as_ref().unwrap_or(&"Abort".to_string())
124-
)
124+
Ok(())
125125
}
126126
}
127127
}

src/query/ast/src/parser/statement.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2965,22 +2965,14 @@ pub fn insert_source_file(i: Input) -> IResult<InsertSource> {
29652965
);
29662966
map(
29672967
rule! {
2968-
VALUES ~ #value? ~ #file_format_clause ~ (ON_ERROR ~ ^"=" ~ ^#ident)?
2968+
(VALUES ~ #value?)? ~ FROM ~ #at_string ~ #file_format_clause
29692969
},
2970-
|(_, value, options, on_error_opt)| InsertSource::StreamingLoad {
2971-
format_options: options,
2972-
on_error_mode: on_error_opt.map(|v| v.2.to_string()),
2973-
value,
2970+
|(values, _, location, format_options)| InsertSource::LoadFile {
2971+
value: values.map(|(_, value)| value).unwrap_or_default(),
2972+
location,
2973+
format_options,
29742974
},
29752975
)(i)
2976-
// TODO: support query later
2977-
// let query = map(query, |query| InsertSource::Select {
2978-
// query: Box::new(query),
2979-
// });
2980-
// rule!(
2981-
// #file
2982-
// | #query
2983-
// )(i)
29842976
}
29852977

29862978
// `INSERT INTO ... VALUES` statement will
@@ -3004,6 +2996,7 @@ pub fn insert_source_fast_values(i: Input) -> IResult<InsertSource> {
30042996
);
30052997

30062998
rule!(
2999+
#insert_source_file |
30073000
#values
30083001
| #query
30093002
)(i)

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,6 @@ impl Interpreter for InsertInterpreter {
259259
plan.required_source_schema.clone(),
260260
plan.default_exprs.clone(),
261261
plan.block_thresholds,
262-
plan.on_error_mode.clone(),
263262
)?;
264263
if !plan.values_consts.is_empty() {
265264
let input_schema = Arc::new(DataSchema::from(&plan.required_source_schema));

src/query/sql/src/planner/binder/copy_into_table.rs

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use log::warn;
6868
use parking_lot::RwLock;
6969

7070
use crate::binder::bind_query::MaxColumnPosition;
71+
use crate::binder::insert::STAGE_PLACEHOLDER;
7172
use crate::binder::location::parse_uri_location;
7273
use crate::binder::Binder;
7374
use crate::plans::CopyIntoTableMode;
@@ -358,19 +359,52 @@ impl Binder {
358359
values_str: &str,
359360
write_mode: CopyIntoTableMode,
360361
) -> Result<Plan> {
361-
let (required_source_schema, const_columns) = if values_str.is_empty() {
362-
(required_values_schema.clone(), vec![])
362+
let expr_or_placeholders = if values_str.is_empty() {
363+
None
363364
} else {
364-
self.prepared_values_str(values_str, &required_values_schema)
365-
.await?
365+
Some(self.parse_values_str(values_str).await?)
366366
};
367367

368+
let (stage_info, files_info, options) = self.bind_attachment(attachment).await?;
369+
self.bind_copy_from_upload(
370+
bind_context,
371+
catalog_name,
372+
database_name,
373+
table_name,
374+
required_values_schema,
375+
expr_or_placeholders,
376+
stage_info,
377+
files_info,
378+
options,
379+
write_mode,
380+
)
381+
.await
382+
}
383+
384+
#[allow(clippy::too_many_arguments)]
385+
#[async_backtrace::framed]
386+
pub(crate) async fn bind_copy_from_upload(
387+
&mut self,
388+
bind_context: &mut BindContext,
389+
catalog_name: String,
390+
database_name: String,
391+
table_name: String,
392+
required_values_schema: TableSchemaRef,
393+
expr_or_placeholders: Option<Vec<Expr>>,
394+
stage_info: StageInfo,
395+
files_info: StageFilesInfo,
396+
copy_into_table_options: CopyIntoTableOptions,
397+
write_mode: CopyIntoTableMode,
398+
) -> Result<Plan> {
368399
let catalog = self.ctx.get_catalog(&catalog_name).await?;
369400
let catalog_info = catalog.info();
370-
371-
let thread_num = self.ctx.get_settings().get_max_threads()? as usize;
372-
373-
let (stage_info, files_info, options) = self.bind_attachment(attachment).await?;
401+
let settings = self.ctx.get_settings();
402+
let (required_source_schema, values_consts) = if let Some(exprs) = expr_or_placeholders {
403+
self.prepared_values(exprs, &required_values_schema, settings.clone())
404+
.await?
405+
} else {
406+
(required_values_schema.clone(), vec![])
407+
};
374408

375409
// list the files to be copied in binding phase
376410
// note that, this method(`bind_copy_from_attachment`) are used by
@@ -379,6 +413,7 @@ impl Binder {
379413
// currently, they do NOT enforce the deduplication detection rules,
380414
// as the vanilla Copy-Into does.
381415
// thus, we do not care about the "duplicated_files_detected", just set it to empty vector.
416+
let thread_num = settings.get_max_threads()? as usize;
382417
let files_to_copy = list_stage_files(&stage_info, &files_info, thread_num, None).await?;
383418
let duplicated_files_detected = vec![];
384419

@@ -396,7 +431,7 @@ impl Binder {
396431
required_values_schema,
397432
dedup_full_path: false,
398433
path_prefix: None,
399-
values_consts: const_columns,
434+
values_consts,
400435
stage_table_info: StageTableInfo {
401436
schema: required_source_schema,
402437
files_info,
@@ -405,7 +440,7 @@ impl Binder {
405440
duplicated_files_detected,
406441
is_select: false,
407442
default_exprs: Some(default_values),
408-
copy_into_table_options: options,
443+
copy_into_table_options,
409444
stage_root: "".to_string(),
410445
is_variant: false,
411446
},
@@ -526,17 +561,12 @@ impl Binder {
526561
Ok(Plan::CopyIntoTable(Box::new(plan)))
527562
}
528563

529-
pub(crate) async fn prepared_values_str(
530-
&self,
531-
values_str: &str,
532-
source_schema: &TableSchemaRef,
533-
) -> Result<(TableSchemaRef, Vec<Scalar>)> {
564+
pub(crate) async fn parse_values_str(&self, values_str: &str) -> Result<Vec<Expr>> {
534565
let settings = self.ctx.get_settings();
535566
let sql_dialect = settings.get_sql_dialect()?;
536567
let tokens = tokenize_sql(values_str)?;
537-
let expr_or_placeholders = parse_values(&tokens, sql_dialect)?;
538-
self.prepared_values(expr_or_placeholders, source_schema, settings)
539-
.await
568+
let values = parse_values(&tokens, sql_dialect)?;
569+
Ok(values)
540570
}
541571

542572
#[async_backtrace::framed]
@@ -660,6 +690,9 @@ pub async fn resolve_stage_location(
660690
) -> Result<(StageInfo, String)> {
661691
// my_named_stage/abc/
662692
let names: Vec<&str> = location.splitn(2, '/').filter(|v| !v.is_empty()).collect();
693+
if names[0] == STAGE_PLACEHOLDER {
694+
return Err(ErrorCode::BadArguments("placeholder @_databend_upload should be used in streaming_load handler or replaced in client."));
695+
}
663696

664697
let stage = if names[0] == "~" {
665698
StageInfo::new_user_stage(&ctx.get_current_user()?.name)

src/query/sql/src/planner/binder/ddl/stage.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_meta_app::principal::StageInfo;
2323
use databend_common_storage::init_operator;
2424

2525
use super::super::copy_into_table::resolve_stage_location;
26+
use crate::binder::insert::STAGE_PLACEHOLDER;
2627
use crate::binder::location::parse_storage_params_from_uri;
2728
use crate::binder::Binder;
2829
use crate::plans::CreateStagePlan;
@@ -59,6 +60,12 @@ impl Binder {
5960
comments: _,
6061
} = stmt;
6162

63+
if stage_name.to_lowercase() == STAGE_PLACEHOLDER {
64+
return Err(ErrorCode::InvalidArgument(format!(
65+
"Can not create stage with name `{STAGE_PLACEHOLDER}`, which is reserved"
66+
)));
67+
}
68+
6269
let mut stage_info = match location {
6370
None => {
6471
if stage_name == "~" {

src/query/sql/src/planner/binder/insert.rs

Lines changed: 72 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::str::FromStr;
1615
use std::sync::Arc;
1716

17+
use databend_common_ast::ast::CopyIntoTableOptions;
1818
use databend_common_ast::ast::Identifier;
1919
use databend_common_ast::ast::InsertSource;
2020
use databend_common_ast::ast::InsertStmt;
21-
use databend_common_ast::ast::OnErrorMode;
2221
use databend_common_ast::ast::Statement;
22+
use databend_common_catalog::session_type::SessionType;
2323
use databend_common_exception::ErrorCode;
2424
use databend_common_exception::Result;
2525
use databend_common_expression::DataSchemaRef;
@@ -29,8 +29,10 @@ use databend_common_expression::TableSchema;
2929
use databend_common_expression::TableSchemaRefExt;
3030
use databend_common_meta_app::principal::FileFormatOptionsReader;
3131
use databend_common_meta_app::principal::FileFormatParams;
32+
use databend_common_storage::StageFilesInfo;
3233

3334
use super::util::TableIdentifier;
35+
use crate::binder::resolve_stage_location;
3436
use crate::binder::Binder;
3537
use crate::normalize_identifier;
3638
use crate::plans::CopyIntoTableMode;
@@ -42,6 +44,7 @@ use crate::plans::StreamingLoadPlan;
4244
use crate::BindContext;
4345
use crate::DefaultExprBinder;
4446

47+
pub const STAGE_PLACEHOLDER: &str = "_databend_load";
4548
impl Binder {
4649
pub fn schema_project(
4750
&self,
@@ -178,44 +181,81 @@ impl Binder {
178181
let select_plan = self.bind_statement(bind_context, &statement).await?;
179182
Ok(InsertInputSource::SelectPlan(Box::new(select_plan)))
180183
}
181-
InsertSource::StreamingLoad {
184+
InsertSource::LoadFile {
182185
value,
183186
format_options,
184-
on_error_mode,
187+
location,
185188
} => {
186189
let settings = self.ctx.get_settings();
187-
let (required_source_schema, values_consts) = if let Some(value) = value {
188-
self.prepared_values(value, &schema, settings).await?
189-
} else {
190-
(schema.clone(), vec![])
191-
};
192190
let file_format_params = FileFormatParams::try_from_reader(
193191
FileFormatOptionsReader::from_ast(&format_options),
194192
false,
195193
)?;
196-
let required_values_schema: DataSchemaRef = Arc::new(schema.clone().into());
197-
198-
let default_exprs = if file_format_params.need_field_default() {
199-
Some(
200-
DefaultExprBinder::try_new(self.ctx.clone())?
201-
.prepare_default_values(&required_values_schema)?,
202-
)
203-
} else {
204-
None
205-
};
206-
Ok(InsertInputSource::StreamingLoad(StreamingLoadPlan {
207-
file_format: Box::new(file_format_params),
208-
on_error_mode: OnErrorMode::from_str(
209-
&on_error_mode.unwrap_or("abort".to_string()),
210-
)?,
211-
required_values_schema,
212-
values_consts,
213-
block_thresholds: table.get_block_thresholds(),
214-
default_exprs,
215-
// fill it in HTTP handler
216-
receiver: Default::default(),
217-
required_source_schema,
218-
}))
194+
match location.as_str() {
195+
STAGE_PLACEHOLDER => {
196+
if self.ctx.get_session_type() != SessionType::HTTPStreamingLoad {
197+
return Err(ErrorCode::BadArguments("placeholder @_databend_upload should be used in streaming_load handler or replaced in client."));
198+
}
199+
let (required_source_schema, values_consts) = if let Some(value) = value {
200+
self.prepared_values(value, &schema, settings).await?
201+
} else {
202+
(schema.clone(), vec![])
203+
};
204+
205+
let required_values_schema: DataSchemaRef = Arc::new(schema.clone().into());
206+
207+
let default_exprs = if file_format_params.need_field_default() {
208+
Some(
209+
DefaultExprBinder::try_new(self.ctx.clone())?
210+
.prepare_default_values(&required_values_schema)?,
211+
)
212+
} else {
213+
None
214+
};
215+
Ok(InsertInputSource::StreamingLoad(StreamingLoadPlan {
216+
file_format: Box::new(file_format_params),
217+
required_values_schema,
218+
values_consts,
219+
block_thresholds: table.get_block_thresholds(),
220+
default_exprs,
221+
// fill it in HTTP handler
222+
receiver: Default::default(),
223+
required_source_schema,
224+
}))
225+
}
226+
loc => {
227+
let (mut stage_info, path) =
228+
resolve_stage_location(self.ctx.as_ref(), loc).await?;
229+
stage_info.file_format_params = file_format_params;
230+
231+
let files_info = StageFilesInfo {
232+
path,
233+
files: None,
234+
pattern: None,
235+
};
236+
let options = CopyIntoTableOptions {
237+
purge: true,
238+
force: true,
239+
..Default::default()
240+
};
241+
return self
242+
.bind_copy_from_upload(
243+
bind_context,
244+
catalog_name,
245+
database_name,
246+
table_name,
247+
schema,
248+
value,
249+
stage_info,
250+
files_info,
251+
options,
252+
CopyIntoTableMode::Insert {
253+
overwrite: *overwrite,
254+
},
255+
)
256+
.await;
257+
}
258+
}
219259
}
220260
};
221261

src/query/sql/src/planner/binder/replace.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl Binder {
139139
let select_plan = self.bind_statement(bind_context, &statement).await?;
140140
Ok(InsertInputSource::SelectPlan(Box::new(select_plan)))
141141
}
142-
InsertSource::StreamingLoad { .. } => Err(ErrorCode::Unimplemented(
142+
InsertSource::LoadFile { .. } => Err(ErrorCode::Unimplemented(
143143
"Replace with streaming load not supported yet.",
144144
)),
145145
};

0 commit comments

Comments
 (0)