Skip to content

Commit 84c6d69

Browse files
authored
Merge pull request #9249 from everpcpc/feat-insert
feat(query): support attaching stage for insert values
2 parents 2943c22 + 0c3b970 commit 84c6d69

File tree

10 files changed

+295
-5
lines changed

10 files changed

+295
-5
lines changed

scripts/ci/deploy/config/databend-query-node-1.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ table_cache_bloom_index_data_bytes=1073741824
6060
[log]
6161

6262
[log.file]
63-
level = "ERROR"
63+
level = "WARN"
6464
format = "text"
6565
dir = "./.databend/logs_1"
6666

src/meta/types/src/user_stage.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::fmt;
1617
use std::str::FromStr;
1718

1819
use chrono::DateTime;
1920
use chrono::Utc;
21+
use common_exception::ErrorCode;
22+
use common_exception::Result;
2023
use common_io::consts::NAN_BYTES_SNAKE;
2124
use common_storage::StorageParams;
2225

@@ -338,6 +341,88 @@ impl UserStageInfo {
338341
StageType::User => format!("/stage/user/{}/", self.stage_name),
339342
}
340343
}
344+
345+
/// Apply the file format options.
346+
pub fn apply_format_options(&mut self, opts: &BTreeMap<String, String>) -> Result<()> {
347+
if opts.is_empty() {
348+
return Ok(());
349+
}
350+
for (k, v) in opts.iter() {
351+
match k.as_str() {
352+
"format" => {
353+
let format = StageFileFormatType::from_str(v)?;
354+
self.file_format_options.format = format;
355+
}
356+
"skip_header" => {
357+
let skip_header = u64::from_str(v)?;
358+
self.file_format_options.skip_header = skip_header;
359+
}
360+
"field_delimiter" => self.file_format_options.field_delimiter = v.clone(),
361+
"record_delimiter" => self.file_format_options.record_delimiter = v.clone(),
362+
"nan_display" => self.file_format_options.nan_display = v.clone(),
363+
"escape" => self.file_format_options.escape = v.clone(),
364+
"compression" => {
365+
let compression = StageFileCompression::from_str(v)?;
366+
self.file_format_options.compression = compression;
367+
}
368+
"row_tag" => self.file_format_options.row_tag = v.clone(),
369+
"quote" => self.file_format_options.quote = v.clone(),
370+
_ => {
371+
return Err(ErrorCode::BadArguments(format!(
372+
"Unknown stage file format option {}",
373+
k
374+
)));
375+
}
376+
}
377+
}
378+
Ok(())
379+
}
380+
381+
/// Apply the copy options.
382+
pub fn apply_copy_options(&mut self, opts: &BTreeMap<String, String>) -> Result<()> {
383+
if opts.is_empty() {
384+
return Ok(());
385+
}
386+
for (k, v) in opts.iter() {
387+
match k.as_str() {
388+
"on_error" => {
389+
let on_error = OnErrorMode::from_str(v)?;
390+
self.copy_options.on_error = on_error;
391+
}
392+
"size_limit" => {
393+
let size_limit = usize::from_str(v)?;
394+
self.copy_options.size_limit = size_limit;
395+
}
396+
"split_size" => {
397+
let split_size = usize::from_str(v)?;
398+
self.copy_options.split_size = split_size;
399+
}
400+
"purge" => {
401+
let purge = bool::from_str(v).map_err(|_| {
402+
ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v))
403+
})?;
404+
self.copy_options.purge = purge;
405+
}
406+
"single" => {
407+
let single = bool::from_str(v).map_err(|_| {
408+
ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v))
409+
})?;
410+
self.copy_options.single = single;
411+
}
412+
"max_file_size" => {
413+
let max_file_size = usize::from_str(v)?;
414+
self.copy_options.max_file_size = max_file_size;
415+
}
416+
_ => {
417+
return Err(ErrorCode::BadArguments(format!(
418+
"Unknown stage copy option {}",
419+
k
420+
)));
421+
}
422+
}
423+
}
424+
Ok(())
425+
}
341426
}
342427

343428
#[derive(Default, Debug, Clone, PartialEq, Eq)]

src/query/catalog/src/table_context.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::net::SocketAddr;
1617
use std::sync::atomic::AtomicBool;
1718
use std::sync::Arc;
@@ -54,6 +55,13 @@ pub struct ProcessInfo {
5455
pub created_time: SystemTime,
5556
}
5657

58+
#[derive(Debug, Clone)]
59+
pub struct StageAttachment {
60+
pub location: String,
61+
pub format_options: BTreeMap<String, String>,
62+
pub copy_options: BTreeMap<String, String>,
63+
}
64+
5765
#[async_trait::async_trait]
5866
pub trait TableContext: Send + Sync {
5967
/// Build a table instance the plan wants to operate on.
@@ -99,4 +107,5 @@ pub trait TableContext: Send + Sync {
99107
async fn get_table(&self, catalog: &str, database: &str, table: &str)
100108
-> Result<Arc<dyn Table>>;
101109
fn get_processes_info(&self) -> Vec<ProcessInfo>;
110+
fn get_stage_attachment(&self) -> Option<StageAttachment>;
102111
}

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@ use std::io::BufRead;
1717
use std::io::Cursor;
1818
use std::ops::Not;
1919
use std::sync::Arc;
20+
use std::time::Instant;
2021

2122
use aho_corasick::AhoCorasick;
2223
use common_ast::ast::Expr;
2324
use common_ast::parser::parse_comma_separated_exprs;
2425
use common_ast::parser::tokenize_sql;
2526
use common_ast::Backtrace;
2627
use common_base::runtime::GlobalIORuntime;
28+
use common_catalog::plan::StageTableInfo;
2729
use common_catalog::table::AppendMode;
30+
use common_catalog::table_context::StageAttachment;
2831
use common_datablocks::DataBlock;
2932
use common_datavalues::prelude::*;
3033
use common_exception::ErrorCode;
@@ -33,19 +36,26 @@ use common_formats::parse_timezone;
3336
use common_formats::FastFieldDecoderValues;
3437
use common_io::cursor_ext::ReadBytesExt;
3538
use common_io::cursor_ext::ReadCheckPointExt;
39+
use common_meta_types::UserStageInfo;
40+
use common_pipeline_core::Pipeline;
3641
use common_pipeline_sources::processors::sources::AsyncSource;
3742
use common_pipeline_sources::processors::sources::AsyncSourcer;
3843
use common_pipeline_transforms::processors::transforms::Transform;
3944
use common_sql::evaluator::ChunkOperator;
4045
use common_sql::evaluator::CompoundChunkOperator;
46+
use common_sql::executor::table_read_plan::ToReadDataSourcePlan;
4147
use common_sql::Metadata;
4248
use common_sql::MetadataRef;
49+
use common_storages_factory::Table;
50+
use common_storages_stage::StageTable;
51+
use common_users::UserApiProvider;
4352
use parking_lot::Mutex;
4453
use parking_lot::RwLock;
4554

4655
use crate::interpreters::common::append2table;
4756
use crate::interpreters::Interpreter;
4857
use crate::interpreters::InterpreterPtr;
58+
use crate::pipelines::processors::TransformAddOn;
4959
use crate::pipelines::PipelineBuildResult;
5060
use crate::pipelines::SourcePipeBuilder;
5161
use crate::schedulers::build_query_pipeline;
@@ -101,6 +111,100 @@ impl InsertInterpreterV2 {
101111
let cast_needed = select_schema != *output_schema;
102112
Ok(cast_needed)
103113
}
114+
115+
async fn build_insert_from_stage_pipeline(
116+
&self,
117+
table: Arc<dyn Table>,
118+
attachment: Arc<StageAttachment>,
119+
pipeline: &mut Pipeline,
120+
) -> Result<()> {
121+
let start = Instant::now();
122+
let ctx = self.ctx.clone();
123+
let table_ctx: Arc<dyn TableContext> = ctx.clone();
124+
let source_schema = self.plan.schema();
125+
let target_schema = table.schema();
126+
let catalog_name = self.plan.catalog.clone();
127+
let overwrite = self.plan.overwrite;
128+
129+
let (mut stage_info, path) = parse_stage_location(&self.ctx, &attachment.location).await?;
130+
stage_info.apply_format_options(&attachment.format_options)?;
131+
stage_info.apply_copy_options(&attachment.copy_options)?;
132+
133+
let mut stage_table_info = StageTableInfo {
134+
schema: source_schema.clone(),
135+
user_stage_info: stage_info,
136+
path: path.to_string(),
137+
files: vec![],
138+
pattern: "".to_string(),
139+
files_to_copy: None,
140+
};
141+
142+
let all_source_file_infos = StageTable::list_files(&table_ctx, &stage_table_info).await?;
143+
144+
tracing::info!(
145+
"insert: read all stage attachment files finished: {}, elapsed:{}",
146+
all_source_file_infos.len(),
147+
start.elapsed().as_secs()
148+
);
149+
150+
stage_table_info.files_to_copy = Some(all_source_file_infos.clone());
151+
let stage_table = StageTable::try_create(stage_table_info.clone())?;
152+
let read_source_plan = {
153+
stage_table
154+
.read_plan_with_catalog(ctx.clone(), catalog_name, None)
155+
.await?
156+
};
157+
158+
stage_table.read_data(table_ctx, &read_source_plan, pipeline)?;
159+
160+
let need_fill_missing_columns = target_schema != source_schema;
161+
if need_fill_missing_columns {
162+
pipeline.add_transform(|transform_input_port, transform_output_port| {
163+
TransformAddOn::try_create(
164+
transform_input_port,
165+
transform_output_port,
166+
source_schema.clone(),
167+
target_schema.clone(),
168+
ctx.clone(),
169+
)
170+
})?;
171+
}
172+
table.append_data(ctx.clone(), pipeline, AppendMode::Copy, false)?;
173+
174+
pipeline.set_on_finished(move |may_error| {
175+
// capture out variable
176+
let overwrite = overwrite;
177+
let ctx = ctx.clone();
178+
let table = table.clone();
179+
180+
match may_error {
181+
Some(error) => {
182+
tracing::error!("insert stage file error: {}", error);
183+
Err(may_error.as_ref().unwrap().clone())
184+
}
185+
None => {
186+
let append_entries = ctx.consume_precommit_blocks();
187+
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
188+
GlobalIORuntime::instance().block_on(async move {
189+
tracing::info!(
190+
"insert: try to commit append entries:{}, elapsed:{}",
191+
append_entries.len(),
192+
start.elapsed().as_secs()
193+
);
194+
table
195+
.commit_insertion(ctx, append_entries, overwrite)
196+
.await?;
197+
198+
// TODO:(everpcpc) purge copied files
199+
200+
Ok(())
201+
})
202+
}
203+
}
204+
});
205+
206+
Ok(())
207+
}
104208
}
105209

106210
#[async_trait::async_trait]
@@ -156,6 +260,16 @@ impl Interpreter for InsertInterpreterV2 {
156260
.format
157261
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
158262
}
263+
InsertInputSource::Stage(opts) => {
264+
tracing::info!("insert: from stage with options {:?}", opts);
265+
self.build_insert_from_stage_pipeline(
266+
table.clone(),
267+
opts.clone(),
268+
&mut build_res.main_pipeline,
269+
)
270+
.await?;
271+
return Ok(build_res);
272+
}
159273
InsertInputSource::SelectPlan(plan) => {
160274
let table1 = table.clone();
161275
let (mut select_plan, select_column_bindings) = match plan.as_ref() {
@@ -602,3 +716,26 @@ async fn exprs_to_datavalue<'a>(
602716
let datavalues: Vec<DataValue> = res.columns().iter().skip(1).map(|col| col.get(0)).collect();
603717
Ok(datavalues)
604718
}
719+
720+
// TODO:(everpcpc) tmp copy from src/query/sql/src/planner/binder/copy.rs
721+
// move to user stage module
722+
async fn parse_stage_location(
723+
ctx: &Arc<QueryContext>,
724+
location: &str,
725+
) -> Result<(UserStageInfo, String)> {
726+
let s: Vec<&str> = location.split('@').collect();
727+
// @my_ext_stage/abc/
728+
let names: Vec<&str> = s[1].splitn(2, '/').filter(|v| !v.is_empty()).collect();
729+
730+
let stage = if names[0] == "~" {
731+
UserStageInfo::new_user_stage(&ctx.get_current_user()?.name)
732+
} else {
733+
UserApiProvider::instance()
734+
.get_stage(&ctx.get_tenant(), names[0])
735+
.await?
736+
};
737+
738+
let path = names.get(1).unwrap_or(&"").trim_start_matches('/');
739+
740+
Ok((stage, path.to_string()))
741+
}

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use common_base::base::tokio;
2121
use common_base::base::tokio::sync::Mutex as TokioMutex;
2222
use common_base::base::tokio::sync::RwLock;
2323
use common_base::runtime::TrySpawn;
24+
use common_catalog::table_context::StageAttachment;
2425
use common_exception::ErrorCode;
2526
use common_exception::Result;
2627
use serde::Deserialize;
@@ -59,6 +60,7 @@ pub struct HttpQueryRequest {
5960
pub pagination: PaginationConf,
6061
#[serde(default = "default_as_true")]
6162
pub string_fields: bool,
63+
pub stage_attachment: Option<StageAttachmentConf>,
6264
}
6365

6466
const DEFAULT_MAX_ROWS_IN_BUFFER: usize = 5 * 1000 * 1000;
@@ -141,6 +143,15 @@ impl HttpSessionConf {
141143
}
142144
}
143145

146+
#[derive(Deserialize, Debug, Clone)]
147+
pub struct StageAttachmentConf {
148+
/// location of the stage
149+
/// for example: @stage_name/path/to/file, @~/path/to/file
150+
pub(crate) location: String,
151+
pub(crate) format_options: Option<BTreeMap<String, String>>,
152+
pub(crate) copy_options: Option<BTreeMap<String, String>>,
153+
}
154+
144155
#[derive(Debug, Clone)]
145156
pub struct ResponseState {
146157
pub running_time_ms: f64,
@@ -229,6 +240,21 @@ impl HttpQuery {
229240
let sql = &request.sql;
230241
tracing::info!("run query_id={id} in session_id={session_id}, sql='{sql}'");
231242

243+
match &request.stage_attachment {
244+
Some(attachment) => ctx.attach_stage(StageAttachment {
245+
location: attachment.location.clone(),
246+
format_options: match attachment.format_options {
247+
Some(ref params) => params.clone(),
248+
None => BTreeMap::new(),
249+
},
250+
copy_options: match attachment.copy_options {
251+
Some(ref params) => params.clone(),
252+
None => BTreeMap::new(),
253+
},
254+
}),
255+
None => {}
256+
};
257+
232258
let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer);
233259
let start_time = Instant::now();
234260
let state = Arc::new(RwLock::new(Executor {

0 commit comments

Comments
 (0)