From 76a228c9fff959c07fb3bf819c5e82f06bc788f5 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Wed, 14 Dec 2022 15:00:16 +0800 Subject: [PATCH 1/6] feat(query): add arguments SideloadConf to /v1/query --- src/query/service/src/servers/http/v1/query/http_query.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 84cb50907a846..9fd05a26b19e0 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -59,6 +59,7 @@ pub struct HttpQueryRequest { pub pagination: PaginationConf, #[serde(default = "default_as_true")] pub string_fields: bool, + pub sideload: Option, } const DEFAULT_MAX_ROWS_IN_BUFFER: usize = 5 * 1000 * 1000; @@ -141,6 +142,12 @@ impl HttpSessionConf { } } +#[derive(Deserialize, Debug, Clone)] +pub struct SideloadConf { + pub(crate) stage: Option, + pub(crate) url: Option, +} + #[derive(Debug, Clone)] pub struct ResponseState { pub running_time_ms: f64, From 805db705425bbc7df05c0b16f53a84d101f8e5ed Mon Sep 17 00:00:00 2001 From: everpcpc Date: Wed, 14 Dec 2022 19:43:56 +0800 Subject: [PATCH 2/6] feat(query): support sideload data for insert values --- .../deploy/config/databend-query-node-1.toml | 2 +- src/query/catalog/src/table_context.rs | 7 + .../src/interpreters/interpreter_insert_v2.rs | 160 ++++++++++++++++++ .../src/servers/http/v1/query/http_query.rs | 9 + src/query/service/src/sessions/query_ctx.rs | 10 ++ .../service/src/sessions/query_ctx_shared.rs | 12 ++ .../it/storages/fuse/operations/commit.rs | 5 + src/query/sql/src/planner/binder/insert.rs | 11 +- src/query/sql/src/planner/plans/insert.rs | 3 + 9 files changed, 214 insertions(+), 5 deletions(-) diff --git a/scripts/ci/deploy/config/databend-query-node-1.toml b/scripts/ci/deploy/config/databend-query-node-1.toml index f9df888b94ba9..8b89f012db744 100644 --- a/scripts/ci/deploy/config/databend-query-node-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-1.toml @@ -60,7 +60,7 @@ table_cache_bloom_index_data_bytes=1073741824 [log] [log.file] -level = "ERROR" +level = "WARN" format = "text" dir = "./.databend/logs_1" diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index a77489f494804..25ecf4ea145bd 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -54,6 +54,12 @@ pub struct ProcessInfo { pub created_time: SystemTime, } +#[derive(Debug, Clone)] +pub struct SideloadOptions { + pub uri: Option, + pub stage: Option, +} + #[async_trait::async_trait] pub trait TableContext: Send + Sync { /// Build a table instance the plan wants to operate on. @@ -99,4 +105,5 @@ pub trait TableContext: Send + Sync { async fn get_table(&self, catalog: &str, database: &str, table: &str) -> Result>; fn get_processes_info(&self) -> Vec; + fn get_sideload(&self) -> Option; } diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index c45f1c5bdb17d..b39a1b08e8920 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -17,6 +17,7 @@ use std::io::BufRead; use std::io::Cursor; use std::ops::Not; use std::sync::Arc; +use std::time::Instant; use aho_corasick::AhoCorasick; use common_ast::ast::Expr; @@ -24,6 +25,8 @@ use common_ast::parser::parse_comma_separated_exprs; use common_ast::parser::tokenize_sql; use common_ast::Backtrace; use common_base::runtime::GlobalIORuntime; +use common_catalog::plan::StageFileStatus; +use common_catalog::plan::StageTableInfo; use common_catalog::table::AppendMode; use common_datablocks::DataBlock; use common_datavalues::prelude::*; @@ -33,19 +36,26 @@ use common_formats::parse_timezone; use common_formats::FastFieldDecoderValues; use common_io::cursor_ext::ReadBytesExt; use common_io::cursor_ext::ReadCheckPointExt; +use common_meta_types::UserStageInfo; +use common_pipeline_core::Pipeline; use common_pipeline_sources::processors::sources::AsyncSource; use common_pipeline_sources::processors::sources::AsyncSourcer; use common_pipeline_transforms::processors::transforms::Transform; use common_sql::evaluator::ChunkOperator; use common_sql::evaluator::CompoundChunkOperator; +use common_sql::executor::table_read_plan::ToReadDataSourcePlan; use common_sql::Metadata; use common_sql::MetadataRef; +use common_storages_factory::Table; +use common_storages_stage::StageTable; +use common_users::UserApiProvider; use parking_lot::Mutex; use parking_lot::RwLock; use crate::interpreters::common::append2table; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; +use crate::pipelines::processors::TransformAddOn; use crate::pipelines::PipelineBuildResult; use crate::pipelines::SourcePipeBuilder; use crate::schedulers::build_query_pipeline; @@ -101,6 +111,113 @@ impl InsertInterpreterV2 { let cast_needed = select_schema != *output_schema; Ok(cast_needed) } + + // TODO:(everpcpc) + async fn build_insert_from_stage_pipeline( + &self, + table: Arc, + stage_location: &str, + pipeline: &mut Pipeline, + ) -> Result<()> { + let start = Instant::now(); + let ctx = self.ctx.clone(); + let table_ctx: Arc = ctx.clone(); + let source_schema = self.plan.schema(); + let target_schema = table.schema(); + let catalog_name = self.plan.catalog.clone(); + let overwrite = self.plan.overwrite; + + let (stage_info, path) = parse_stage_location(&self.ctx, stage_location).await?; + + let mut stage_table_info = StageTableInfo { + schema: source_schema.clone(), + user_stage_info: stage_info, + path: path.to_string(), + files: vec![], + pattern: "".to_string(), + files_to_copy: None, + }; + + let all_source_file_infos = StageTable::list_files(&table_ctx, &stage_table_info).await?; + + // TODO: color_copied_files + + let mut need_copied_file_infos = vec![]; + for file in &all_source_file_infos { + if file.status == StageFileStatus::NeedCopy { + need_copied_file_infos.push(file.clone()); + } + } + + // DEBUG: + tracing::warn!( + "insert: read all sideload files finished, all:{}, need copy:{}, elapsed:{}", + all_source_file_infos.len(), + need_copied_file_infos.len(), + start.elapsed().as_secs() + ); + + if need_copied_file_infos.is_empty() { + return Ok(()); + } + + stage_table_info.files_to_copy = Some(need_copied_file_infos.clone()); + let stage_table = StageTable::try_create(stage_table_info.clone())?; + let read_source_plan = { + stage_table + .read_plan_with_catalog(ctx.clone(), catalog_name, None) + .await? + }; + + stage_table.read_data(table_ctx, &read_source_plan, pipeline)?; + + let need_fill_missing_columns = target_schema != source_schema; + if need_fill_missing_columns { + pipeline.add_transform(|transform_input_port, transform_output_port| { + TransformAddOn::try_create( + transform_input_port, + transform_output_port, + source_schema.clone(), + target_schema.clone(), + ctx.clone(), + ) + })?; + } + table.append_data(ctx.clone(), pipeline, AppendMode::Copy, false)?; + + pipeline.set_on_finished(move |may_error| { + // capture out variable + let overwrite = overwrite; + let ctx = ctx.clone(); + let table = table.clone(); + + match may_error { + Some(error) => { + tracing::error!("insert stage file error: {}", error); + Err(may_error.as_ref().unwrap().clone()) + } + None => { + let append_entries = ctx.consume_precommit_blocks(); + // We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower + return GlobalIORuntime::instance().block_on(async move { + // DEBUG: + tracing::warn!( + "insert: try to commit append entries:{}, elapsed:{}", + append_entries.len(), + start.elapsed().as_secs() + ); + table + .commit_insertion(ctx, append_entries, overwrite) + .await?; + Ok(()) + // TODO: purge copied files + }); + } + } + }); + + Ok(()) + } } #[async_trait::async_trait] @@ -156,6 +273,27 @@ impl Interpreter for InsertInterpreterV2 { .format .exec_stream(input_context.clone(), &mut build_res.main_pipeline)?; } + InsertInputSource::Sideload(opts) => { + // DEBUG: + tracing::warn!("==> sideload insert: {:?}", opts); + + match &opts.stage { + None => { + return Err(ErrorCode::BadDataValueType( + "No stage location provided".to_string(), + )); + } + Some(stage_location) => { + self.build_insert_from_stage_pipeline( + table.clone(), + stage_location, + &mut build_res.main_pipeline, + ) + .await?; + } + } + return Ok(build_res); + } InsertInputSource::SelectPlan(plan) => { let table1 = table.clone(); let (mut select_plan, select_column_bindings) = match plan.as_ref() { @@ -602,3 +740,25 @@ async fn exprs_to_datavalue<'a>( let datavalues: Vec = res.columns().iter().skip(1).map(|col| col.get(0)).collect(); Ok(datavalues) } + +// FIXME: tmp copy from src/query/sql/src/planner/binder/copy.rs +async fn parse_stage_location( + ctx: &Arc, + location: &str, +) -> Result<(UserStageInfo, String)> { + let s: Vec<&str> = location.split('@').collect(); + // @my_ext_stage/abc/ + let names: Vec<&str> = s[1].splitn(2, '/').filter(|v| !v.is_empty()).collect(); + + let stage = if names[0] == "~" { + UserStageInfo::new_user_stage(&ctx.get_current_user()?.name) + } else { + UserApiProvider::instance() + .get_stage(&ctx.get_tenant(), names[0]) + .await? + }; + + let path = names.get(1).unwrap_or(&"").trim_start_matches('/'); + + Ok((stage, path.to_string())) +} diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 9fd05a26b19e0..62a3f182fc9ef 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -21,6 +21,7 @@ use common_base::base::tokio; use common_base::base::tokio::sync::Mutex as TokioMutex; use common_base::base::tokio::sync::RwLock; use common_base::runtime::TrySpawn; +use common_catalog::table_context::SideloadOptions; use common_exception::ErrorCode; use common_exception::Result; use serde::Deserialize; @@ -236,6 +237,14 @@ impl HttpQuery { let sql = &request.sql; tracing::info!("run query_id={id} in session_id={session_id}, sql='{sql}'"); + match &request.sideload { + Some(sideload) => ctx.attach_sideload(SideloadOptions { + uri: sideload.url.clone(), + stage: sideload.stage.clone(), + }), + None => {} + }; + let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer); let start_time = Instant::now(); let state = Arc::new(RwLock::new(Executor { diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 2fbafa1378a28..ef350a18b62c0 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -32,6 +32,7 @@ use common_catalog::plan::DataSourcePlan; use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Partitions; use common_catalog::plan::StageTableInfo; +use common_catalog::table_context::SideloadOptions; use common_config::DATABEND_COMMIT_VERSION; use common_datablocks::DataBlock; use common_datavalues::DataValue; @@ -192,6 +193,10 @@ impl QueryContext { self.shared.set_executor(weak_ptr) } + pub fn attach_sideload(&self, sideload: SideloadOptions) { + self.shared.attach_sideload(sideload); + } + pub fn get_created_time(&self) -> SystemTime { self.shared.created_time } @@ -354,6 +359,11 @@ impl TableContext for QueryContext { fn get_processes_info(&self) -> Vec { SessionManager::instance().processes_info() } + + // Get Sideload Options. + fn get_sideload(&self) -> Option { + self.shared.get_sideload() + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 2b15b0db2befd..d1c421ad045f6 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -22,6 +22,7 @@ use std::time::SystemTime; use common_base::base::Progress; use common_base::runtime::Runtime; +use common_catalog::table_context::SideloadOptions; use common_config::Config; use common_datablocks::DataBlock; use common_exception::ErrorCode; @@ -78,6 +79,7 @@ pub struct QueryContextShared { pub(in crate::sessions) data_operator: DataOperator, pub(in crate::sessions) executor: Arc>>, pub(in crate::sessions) precommit_blocks: Arc>>, + pub(in crate::sessions) sideload_config: Arc>>, pub(in crate::sessions) created_time: SystemTime, } @@ -107,6 +109,7 @@ impl QueryContextShared { affect: Arc::new(Mutex::new(None)), executor: Arc::new(RwLock::new(Weak::new())), precommit_blocks: Arc::new(RwLock::new(vec![])), + sideload_config: Arc::new(RwLock::new(None)), created_time: SystemTime::now(), })) } @@ -316,6 +319,15 @@ impl QueryContextShared { swaped_precommit_blocks } + pub fn get_sideload(&self) -> Option { + self.sideload_config.read().clone() + } + + pub fn attach_sideload(&self, sideload: SideloadOptions) { + let mut sideload_config = self.sideload_config.write(); + *sideload_config = Some(sideload); + } + pub fn get_created_time(&self) -> SystemTime { self.created_time } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 53a7aa8ad8918..ea21047cc6e9b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -27,6 +27,7 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Partitions; use common_catalog::table::Table; use common_catalog::table_context::ProcessInfo; +use common_catalog::table_context::SideloadOptions; use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_exception::ErrorCode; @@ -453,6 +454,10 @@ impl TableContext for CtxDelegation { fn get_processes_info(&self) -> Vec { todo!() } + + fn get_sideload(&self) -> Option { + todo!() + } } #[derive(Clone)] diff --git a/src/query/sql/src/planner/binder/insert.rs b/src/query/sql/src/planner/binder/insert.rs index e1d95d1052823..9cc6373695839 100644 --- a/src/query/sql/src/planner/binder/insert.rs +++ b/src/query/sql/src/planner/binder/insert.rs @@ -96,10 +96,13 @@ impl<'a> Binder { opts, start, None, )) } - InsertSource::Values { rest_str } => { - let data = rest_str.trim_end_matches(';').trim_start().to_owned(); - Ok(InsertInputSource::Values(data)) - } + InsertSource::Values { rest_str } => match self.ctx.get_sideload() { + Some(sideload) => Ok(InsertInputSource::Sideload(Arc::new(sideload))), + None => { + let data = rest_str.trim_end_matches(';').trim_start().to_owned(); + Ok(InsertInputSource::Values(data)) + } + }, InsertSource::Select { query } => { let statement = Statement::Query(query); let select_plan = self.bind_statement(bind_context, &statement).await?; diff --git a/src/query/sql/src/planner/plans/insert.rs b/src/query/sql/src/planner/plans/insert.rs index df63b2cb4658d..ed065f051ab9e 100644 --- a/src/query/sql/src/planner/plans/insert.rs +++ b/src/query/sql/src/planner/plans/insert.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_catalog::table_context::SideloadOptions; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_meta_types::FileFormatOptions; @@ -31,6 +32,8 @@ pub enum InsertInputSource { StreamingWithFileFormat(FileFormatOptions, usize, Option>), // From cloned String and format Values(String), + // From sideload stage or uri + Sideload(Arc), } #[derive(Clone)] From 6929468e41ca6f702efc7e1ac8febcdad781ddf3 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 15 Dec 2022 18:58:59 +0800 Subject: [PATCH 3/6] fix: rename sideload to stage_attachment --- src/query/catalog/src/table_context.rs | 9 +- .../src/interpreters/interpreter_insert_v2.rs | 124 +++++++++++++----- .../src/servers/http/v1/query/http_query.rs | 21 +-- src/query/service/src/sessions/query_ctx.rs | 12 +- .../service/src/sessions/query_ctx_shared.rs | 16 +-- .../it/storages/fuse/operations/commit.rs | 4 +- src/query/sql/src/planner/binder/insert.rs | 4 +- src/query/sql/src/planner/plans/insert.rs | 6 +- 8 files changed, 130 insertions(+), 66 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 25ecf4ea145bd..768160b42619f 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::net::SocketAddr; use std::sync::atomic::AtomicBool; use std::sync::Arc; @@ -55,9 +56,9 @@ pub struct ProcessInfo { } #[derive(Debug, Clone)] -pub struct SideloadOptions { - pub uri: Option, - pub stage: Option, +pub struct StageAttachment { + pub location: String, + pub params: BTreeMap, } #[async_trait::async_trait] @@ -105,5 +106,5 @@ pub trait TableContext: Send + Sync { async fn get_table(&self, catalog: &str, database: &str, table: &str) -> Result>; fn get_processes_info(&self) -> Vec; - fn get_sideload(&self) -> Option; + fn get_stage_attachment(&self) -> Option; } diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index b39a1b08e8920..11bfaf1bf12a7 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::collections::VecDeque; use std::io::BufRead; use std::io::Cursor; use std::ops::Not; +use std::str::FromStr; use std::sync::Arc; use std::time::Instant; @@ -28,6 +30,7 @@ use common_base::runtime::GlobalIORuntime; use common_catalog::plan::StageFileStatus; use common_catalog::plan::StageTableInfo; use common_catalog::table::AppendMode; +use common_catalog::table_context::StageAttachment; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; @@ -36,6 +39,9 @@ use common_formats::parse_timezone; use common_formats::FastFieldDecoderValues; use common_io::cursor_ext::ReadBytesExt; use common_io::cursor_ext::ReadCheckPointExt; +use common_meta_types::OnErrorMode; +use common_meta_types::StageFileCompression; +use common_meta_types::StageFileFormatType; use common_meta_types::UserStageInfo; use common_pipeline_core::Pipeline; use common_pipeline_sources::processors::sources::AsyncSource; @@ -112,11 +118,74 @@ impl InsertInterpreterV2 { Ok(cast_needed) } - // TODO:(everpcpc) + fn apply_stage_options( + &self, + stage: &mut UserStageInfo, + params: &BTreeMap, + ) -> Result<()> { + for (k, v) in params.iter() { + match k.as_str() { + // file format options + "format" => { + let format = StageFileFormatType::from_str(v)?; + stage.file_format_options.format = format; + } + "skip_header" => { + let skip_header = u64::from_str(v)?; + stage.file_format_options.skip_header = skip_header; + } + "field_delimiter" => stage.file_format_options.field_delimiter = v.clone(), + "record_delimiter" => stage.file_format_options.record_delimiter = v.clone(), + "nan_display" => stage.file_format_options.nan_display = v.clone(), + "escape" => stage.file_format_options.escape = v.clone(), + "compression" => { + let compression = StageFileCompression::from_str(v)?; + stage.file_format_options.compression = compression; + } + "row_tag" => stage.file_format_options.row_tag = v.clone(), + "quote" => stage.file_format_options.quote = v.clone(), + + // copy options + "on_error" => { + let on_error = OnErrorMode::from_str(v)?; + stage.copy_options.on_error = on_error; + } + "size_limit" => { + let size_limit = usize::from_str(v)?; + stage.copy_options.size_limit = size_limit; + } + "split_size" => { + let split_size = usize::from_str(v)?; + stage.copy_options.split_size = split_size; + } + "purge" => { + let purge = bool::from_str(v).map_err(|_| { + ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v)) + })?; + stage.copy_options.purge = purge; + } + "single" => { + let single = bool::from_str(v).map_err(|_| { + ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v)) + })?; + stage.copy_options.single = single; + } + "max_file_size" => { + let max_file_size = usize::from_str(v)?; + stage.copy_options.max_file_size = max_file_size; + } + + _ => {} + } + } + + Ok(()) + } + async fn build_insert_from_stage_pipeline( &self, table: Arc, - stage_location: &str, + attachment: Arc, pipeline: &mut Pipeline, ) -> Result<()> { let start = Instant::now(); @@ -127,7 +196,8 @@ impl InsertInterpreterV2 { let catalog_name = self.plan.catalog.clone(); let overwrite = self.plan.overwrite; - let (stage_info, path) = parse_stage_location(&self.ctx, stage_location).await?; + let (mut stage_info, path) = parse_stage_location(&self.ctx, &attachment.location).await?; + self.apply_stage_options(&mut stage_info, &attachment.params)?; let mut stage_table_info = StageTableInfo { schema: source_schema.clone(), @@ -140,7 +210,7 @@ impl InsertInterpreterV2 { let all_source_file_infos = StageTable::list_files(&table_ctx, &stage_table_info).await?; - // TODO: color_copied_files + // TODO:(everpcpc) color_copied_files let mut need_copied_file_infos = vec![]; for file in &all_source_file_infos { @@ -149,9 +219,8 @@ impl InsertInterpreterV2 { } } - // DEBUG: - tracing::warn!( - "insert: read all sideload files finished, all:{}, need copy:{}, elapsed:{}", + tracing::info!( + "insert: read all stage attachment files finished, all:{}, need copy:{}, elapsed:{}", all_source_file_infos.len(), need_copied_file_infos.len(), start.elapsed().as_secs() @@ -199,9 +268,8 @@ impl InsertInterpreterV2 { None => { let append_entries = ctx.consume_precommit_blocks(); // We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower - return GlobalIORuntime::instance().block_on(async move { - // DEBUG: - tracing::warn!( + GlobalIORuntime::instance().block_on(async move { + tracing::info!( "insert: try to commit append entries:{}, elapsed:{}", append_entries.len(), start.elapsed().as_secs() @@ -209,9 +277,11 @@ impl InsertInterpreterV2 { table .commit_insertion(ctx, append_entries, overwrite) .await?; + + // TODO:(everpcpc) purge copied files + Ok(()) - // TODO: purge copied files - }); + }) } } }); @@ -273,25 +343,14 @@ impl Interpreter for InsertInterpreterV2 { .format .exec_stream(input_context.clone(), &mut build_res.main_pipeline)?; } - InsertInputSource::Sideload(opts) => { - // DEBUG: - tracing::warn!("==> sideload insert: {:?}", opts); - - match &opts.stage { - None => { - return Err(ErrorCode::BadDataValueType( - "No stage location provided".to_string(), - )); - } - Some(stage_location) => { - self.build_insert_from_stage_pipeline( - table.clone(), - stage_location, - &mut build_res.main_pipeline, - ) - .await?; - } - } + InsertInputSource::Stage(opts) => { + tracing::info!("insert: from stage with options {:?}", opts); + self.build_insert_from_stage_pipeline( + table.clone(), + opts.clone(), + &mut build_res.main_pipeline, + ) + .await?; return Ok(build_res); } InsertInputSource::SelectPlan(plan) => { @@ -741,7 +800,8 @@ async fn exprs_to_datavalue<'a>( Ok(datavalues) } -// FIXME: tmp copy from src/query/sql/src/planner/binder/copy.rs +// TODO:(everpcpc) tmp copy from src/query/sql/src/planner/binder/copy.rs +// move to user stage module async fn parse_stage_location( ctx: &Arc, location: &str, diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 62a3f182fc9ef..1342720555eb8 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -21,7 +21,7 @@ use common_base::base::tokio; use common_base::base::tokio::sync::Mutex as TokioMutex; use common_base::base::tokio::sync::RwLock; use common_base::runtime::TrySpawn; -use common_catalog::table_context::SideloadOptions; +use common_catalog::table_context::StageAttachment; use common_exception::ErrorCode; use common_exception::Result; use serde::Deserialize; @@ -60,7 +60,7 @@ pub struct HttpQueryRequest { pub pagination: PaginationConf, #[serde(default = "default_as_true")] pub string_fields: bool, - pub sideload: Option, + pub stage_attachment: Option, } const DEFAULT_MAX_ROWS_IN_BUFFER: usize = 5 * 1000 * 1000; @@ -144,9 +144,9 @@ impl HttpSessionConf { } #[derive(Deserialize, Debug, Clone)] -pub struct SideloadConf { - pub(crate) stage: Option, - pub(crate) url: Option, +pub struct StageAttachmentConf { + pub(crate) location: String, + pub(crate) params: Option>, } #[derive(Debug, Clone)] @@ -237,10 +237,13 @@ impl HttpQuery { let sql = &request.sql; tracing::info!("run query_id={id} in session_id={session_id}, sql='{sql}'"); - match &request.sideload { - Some(sideload) => ctx.attach_sideload(SideloadOptions { - uri: sideload.url.clone(), - stage: sideload.stage.clone(), + match &request.stage_attachment { + Some(attachment) => ctx.attach_stage(StageAttachment { + location: attachment.location.clone(), + params: match attachment.params { + Some(ref params) => params.clone(), + None => BTreeMap::new(), + }, }), None => {} }; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index ef350a18b62c0..5f26516d594cc 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -32,7 +32,7 @@ use common_catalog::plan::DataSourcePlan; use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Partitions; use common_catalog::plan::StageTableInfo; -use common_catalog::table_context::SideloadOptions; +use common_catalog::table_context::StageAttachment; use common_config::DATABEND_COMMIT_VERSION; use common_datablocks::DataBlock; use common_datavalues::DataValue; @@ -193,8 +193,8 @@ impl QueryContext { self.shared.set_executor(weak_ptr) } - pub fn attach_sideload(&self, sideload: SideloadOptions) { - self.shared.attach_sideload(sideload); + pub fn attach_stage(&self, attachment: StageAttachment) { + self.shared.attach_stage(attachment); } pub fn get_created_time(&self) -> SystemTime { @@ -360,9 +360,9 @@ impl TableContext for QueryContext { SessionManager::instance().processes_info() } - // Get Sideload Options. - fn get_sideload(&self) -> Option { - self.shared.get_sideload() + // Get Stage Attachment. + fn get_stage_attachment(&self) -> Option { + self.shared.get_stage_attachment() } } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index d1c421ad045f6..a66a0909e55b8 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -22,7 +22,7 @@ use std::time::SystemTime; use common_base::base::Progress; use common_base::runtime::Runtime; -use common_catalog::table_context::SideloadOptions; +use common_catalog::table_context::StageAttachment; use common_config::Config; use common_datablocks::DataBlock; use common_exception::ErrorCode; @@ -79,7 +79,7 @@ pub struct QueryContextShared { pub(in crate::sessions) data_operator: DataOperator, pub(in crate::sessions) executor: Arc>>, pub(in crate::sessions) precommit_blocks: Arc>>, - pub(in crate::sessions) sideload_config: Arc>>, + pub(in crate::sessions) stage_attachment: Arc>>, pub(in crate::sessions) created_time: SystemTime, } @@ -109,7 +109,7 @@ impl QueryContextShared { affect: Arc::new(Mutex::new(None)), executor: Arc::new(RwLock::new(Weak::new())), precommit_blocks: Arc::new(RwLock::new(vec![])), - sideload_config: Arc::new(RwLock::new(None)), + stage_attachment: Arc::new(RwLock::new(None)), created_time: SystemTime::now(), })) } @@ -319,13 +319,13 @@ impl QueryContextShared { swaped_precommit_blocks } - pub fn get_sideload(&self) -> Option { - self.sideload_config.read().clone() + pub fn get_stage_attachment(&self) -> Option { + self.stage_attachment.read().clone() } - pub fn attach_sideload(&self, sideload: SideloadOptions) { - let mut sideload_config = self.sideload_config.write(); - *sideload_config = Some(sideload); + pub fn attach_stage(&self, attachment: StageAttachment) { + let mut stage_attachment = self.stage_attachment.write(); + *stage_attachment = Some(attachment); } pub fn get_created_time(&self) -> SystemTime { diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index ea21047cc6e9b..61904154ad021 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -27,7 +27,7 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Partitions; use common_catalog::table::Table; use common_catalog::table_context::ProcessInfo; -use common_catalog::table_context::SideloadOptions; +use common_catalog::table_context::StageAttachment; use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_exception::ErrorCode; @@ -455,7 +455,7 @@ impl TableContext for CtxDelegation { todo!() } - fn get_sideload(&self) -> Option { + fn get_stage_attachment(&self) -> Option { todo!() } } diff --git a/src/query/sql/src/planner/binder/insert.rs b/src/query/sql/src/planner/binder/insert.rs index 9cc6373695839..a6beccc7e744c 100644 --- a/src/query/sql/src/planner/binder/insert.rs +++ b/src/query/sql/src/planner/binder/insert.rs @@ -96,8 +96,8 @@ impl<'a> Binder { opts, start, None, )) } - InsertSource::Values { rest_str } => match self.ctx.get_sideload() { - Some(sideload) => Ok(InsertInputSource::Sideload(Arc::new(sideload))), + InsertSource::Values { rest_str } => match self.ctx.get_stage_attachment() { + Some(attachment) => Ok(InsertInputSource::Stage(Arc::new(attachment))), None => { let data = rest_str.trim_end_matches(';').trim_start().to_owned(); Ok(InsertInputSource::Values(data)) diff --git a/src/query/sql/src/planner/plans/insert.rs b/src/query/sql/src/planner/plans/insert.rs index ed065f051ab9e..e0aadc873cdc7 100644 --- a/src/query/sql/src/planner/plans/insert.rs +++ b/src/query/sql/src/planner/plans/insert.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use common_catalog::table_context::SideloadOptions; +use common_catalog::table_context::StageAttachment; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_meta_types::FileFormatOptions; @@ -32,8 +32,8 @@ pub enum InsertInputSource { StreamingWithFileFormat(FileFormatOptions, usize, Option>), // From cloned String and format Values(String), - // From sideload stage or uri - Sideload(Arc), + // From stage + Stage(Arc), } #[derive(Clone)] From ef514e3cc046b8fc8a99d9305e85e5f3f08a7ed0 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 15 Dec 2022 22:14:52 +0800 Subject: [PATCH 4/6] fix: do not check stage file for need_copied_file --- .../src/interpreters/interpreter_insert_v2.rs | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index 11bfaf1bf12a7..8e9f6c12598cc 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -27,7 +27,6 @@ use common_ast::parser::parse_comma_separated_exprs; use common_ast::parser::tokenize_sql; use common_ast::Backtrace; use common_base::runtime::GlobalIORuntime; -use common_catalog::plan::StageFileStatus; use common_catalog::plan::StageTableInfo; use common_catalog::table::AppendMode; use common_catalog::table_context::StageAttachment; @@ -212,25 +211,13 @@ impl InsertInterpreterV2 { // TODO:(everpcpc) color_copied_files - let mut need_copied_file_infos = vec![]; - for file in &all_source_file_infos { - if file.status == StageFileStatus::NeedCopy { - need_copied_file_infos.push(file.clone()); - } - } - tracing::info!( - "insert: read all stage attachment files finished, all:{}, need copy:{}, elapsed:{}", + "insert: read all stage attachment files finished: {}, elapsed:{}", all_source_file_infos.len(), - need_copied_file_infos.len(), start.elapsed().as_secs() ); - if need_copied_file_infos.is_empty() { - return Ok(()); - } - - stage_table_info.files_to_copy = Some(need_copied_file_infos.clone()); + stage_table_info.files_to_copy = Some(all_source_file_infos.clone()); let stage_table = StageTable::try_create(stage_table_info.clone())?; let read_source_plan = { stage_table From df5c26598a5321b78ab0b8062fcc2f3c5cb4cf0e Mon Sep 17 00:00:00 2001 From: everpcpc Date: Fri, 16 Dec 2022 11:31:48 +0800 Subject: [PATCH 5/6] fix: separate format & copy options for stage --- src/meta/types/src/user_stage.rs | 85 +++++++++++++++++++ src/query/catalog/src/table_context.rs | 3 +- .../src/interpreters/interpreter_insert_v2.rs | 74 +--------------- .../src/servers/http/v1/query/http_query.rs | 9 +- 4 files changed, 96 insertions(+), 75 deletions(-) diff --git a/src/meta/types/src/user_stage.rs b/src/meta/types/src/user_stage.rs index f864f6733b8a6..da5e26a071730 100644 --- a/src/meta/types/src/user_stage.rs +++ b/src/meta/types/src/user_stage.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::fmt; use std::str::FromStr; use chrono::DateTime; use chrono::Utc; +use common_exception::ErrorCode; +use common_exception::Result; use common_io::consts::NAN_BYTES_SNAKE; use common_storage::StorageParams; @@ -338,6 +341,88 @@ impl UserStageInfo { StageType::User => format!("/stage/user/{}/", self.stage_name), } } + + /// Apply the file format options. + pub fn apply_format_options(&mut self, opts: &BTreeMap) -> Result<()> { + if opts.is_empty() { + return Ok(()); + } + for (k, v) in opts.iter() { + match k.as_str() { + "format" => { + let format = StageFileFormatType::from_str(v)?; + self.file_format_options.format = format; + } + "skip_header" => { + let skip_header = u64::from_str(v)?; + self.file_format_options.skip_header = skip_header; + } + "field_delimiter" => self.file_format_options.field_delimiter = v.clone(), + "record_delimiter" => self.file_format_options.record_delimiter = v.clone(), + "nan_display" => self.file_format_options.nan_display = v.clone(), + "escape" => self.file_format_options.escape = v.clone(), + "compression" => { + let compression = StageFileCompression::from_str(v)?; + self.file_format_options.compression = compression; + } + "row_tag" => self.file_format_options.row_tag = v.clone(), + "quote" => self.file_format_options.quote = v.clone(), + _ => { + return Err(ErrorCode::BadArguments(format!( + "Unknown stage file format option {}", + k + ))); + } + } + } + Ok(()) + } + + /// Apply the copy options. + pub fn apply_copy_options(&mut self, opts: &BTreeMap) -> Result<()> { + if opts.is_empty() { + return Ok(()); + } + for (k, v) in opts.iter() { + match k.as_str() { + "on_error" => { + let on_error = OnErrorMode::from_str(v)?; + self.copy_options.on_error = on_error; + } + "size_limit" => { + let size_limit = usize::from_str(v)?; + self.copy_options.size_limit = size_limit; + } + "split_size" => { + let split_size = usize::from_str(v)?; + self.copy_options.split_size = split_size; + } + "purge" => { + let purge = bool::from_str(v).map_err(|_| { + ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v)) + })?; + self.copy_options.purge = purge; + } + "single" => { + let single = bool::from_str(v).map_err(|_| { + ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v)) + })?; + self.copy_options.single = single; + } + "max_file_size" => { + let max_file_size = usize::from_str(v)?; + self.copy_options.max_file_size = max_file_size; + } + _ => { + return Err(ErrorCode::BadArguments(format!( + "Unknown stage copy option {}", + k + ))); + } + } + } + Ok(()) + } } #[derive(Default, Debug, Clone, PartialEq, Eq)] diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 768160b42619f..160d11fc47375 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -58,7 +58,8 @@ pub struct ProcessInfo { #[derive(Debug, Clone)] pub struct StageAttachment { pub location: String, - pub params: BTreeMap, + pub format_options: BTreeMap, + pub copy_options: BTreeMap, } #[async_trait::async_trait] diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index 8e9f6c12598cc..0e75ca97aa2cc 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; use std::collections::VecDeque; use std::io::BufRead; use std::io::Cursor; use std::ops::Not; -use std::str::FromStr; use std::sync::Arc; use std::time::Instant; @@ -38,9 +36,6 @@ use common_formats::parse_timezone; use common_formats::FastFieldDecoderValues; use common_io::cursor_ext::ReadBytesExt; use common_io::cursor_ext::ReadCheckPointExt; -use common_meta_types::OnErrorMode; -use common_meta_types::StageFileCompression; -use common_meta_types::StageFileFormatType; use common_meta_types::UserStageInfo; use common_pipeline_core::Pipeline; use common_pipeline_sources::processors::sources::AsyncSource; @@ -117,70 +112,6 @@ impl InsertInterpreterV2 { Ok(cast_needed) } - fn apply_stage_options( - &self, - stage: &mut UserStageInfo, - params: &BTreeMap, - ) -> Result<()> { - for (k, v) in params.iter() { - match k.as_str() { - // file format options - "format" => { - let format = StageFileFormatType::from_str(v)?; - stage.file_format_options.format = format; - } - "skip_header" => { - let skip_header = u64::from_str(v)?; - stage.file_format_options.skip_header = skip_header; - } - "field_delimiter" => stage.file_format_options.field_delimiter = v.clone(), - "record_delimiter" => stage.file_format_options.record_delimiter = v.clone(), - "nan_display" => stage.file_format_options.nan_display = v.clone(), - "escape" => stage.file_format_options.escape = v.clone(), - "compression" => { - let compression = StageFileCompression::from_str(v)?; - stage.file_format_options.compression = compression; - } - "row_tag" => stage.file_format_options.row_tag = v.clone(), - "quote" => stage.file_format_options.quote = v.clone(), - - // copy options - "on_error" => { - let on_error = OnErrorMode::from_str(v)?; - stage.copy_options.on_error = on_error; - } - "size_limit" => { - let size_limit = usize::from_str(v)?; - stage.copy_options.size_limit = size_limit; - } - "split_size" => { - let split_size = usize::from_str(v)?; - stage.copy_options.split_size = split_size; - } - "purge" => { - let purge = bool::from_str(v).map_err(|_| { - ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v)) - })?; - stage.copy_options.purge = purge; - } - "single" => { - let single = bool::from_str(v).map_err(|_| { - ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v)) - })?; - stage.copy_options.single = single; - } - "max_file_size" => { - let max_file_size = usize::from_str(v)?; - stage.copy_options.max_file_size = max_file_size; - } - - _ => {} - } - } - - Ok(()) - } - async fn build_insert_from_stage_pipeline( &self, table: Arc, @@ -196,7 +127,8 @@ impl InsertInterpreterV2 { let overwrite = self.plan.overwrite; let (mut stage_info, path) = parse_stage_location(&self.ctx, &attachment.location).await?; - self.apply_stage_options(&mut stage_info, &attachment.params)?; + stage_info.apply_format_options(&attachment.format_options)?; + stage_info.apply_copy_options(&attachment.copy_options)?; let mut stage_table_info = StageTableInfo { schema: source_schema.clone(), @@ -209,8 +141,6 @@ impl InsertInterpreterV2 { let all_source_file_infos = StageTable::list_files(&table_ctx, &stage_table_info).await?; - // TODO:(everpcpc) color_copied_files - tracing::info!( "insert: read all stage attachment files finished: {}, elapsed:{}", all_source_file_infos.len(), diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 1342720555eb8..9a036a4c89520 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -146,7 +146,8 @@ impl HttpSessionConf { #[derive(Deserialize, Debug, Clone)] pub struct StageAttachmentConf { pub(crate) location: String, - pub(crate) params: Option>, + pub(crate) format_options: Option>, + pub(crate) copy_options: Option>, } #[derive(Debug, Clone)] @@ -240,7 +241,11 @@ impl HttpQuery { match &request.stage_attachment { Some(attachment) => ctx.attach_stage(StageAttachment { location: attachment.location.clone(), - params: match attachment.params { + format_options: match attachment.format_options { + Some(ref params) => params.clone(), + None => BTreeMap::new(), + }, + copy_options: match attachment.copy_options { Some(ref params) => params.clone(), None => BTreeMap::new(), }, From ced20aaa183a13530282d0e1b4f674cc3160d516 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Fri, 16 Dec 2022 11:37:35 +0800 Subject: [PATCH 6/6] feat: add example for StageAttachmentConf location --- src/query/service/src/servers/http/v1/query/http_query.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 9a036a4c89520..97805c9772671 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -145,6 +145,8 @@ impl HttpSessionConf { #[derive(Deserialize, Debug, Clone)] pub struct StageAttachmentConf { + /// location of the stage + /// for example: @stage_name/path/to/file, @~/path/to/file pub(crate) location: String, pub(crate) format_options: Option>, pub(crate) copy_options: Option>,