diff --git a/scripts/ci/deploy/config/databend-query-node-1.toml b/scripts/ci/deploy/config/databend-query-node-1.toml index f9df888b94ba9..8b89f012db744 100644 --- a/scripts/ci/deploy/config/databend-query-node-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-1.toml @@ -60,7 +60,7 @@ table_cache_bloom_index_data_bytes=1073741824 [log] [log.file] -level = "ERROR" +level = "WARN" format = "text" dir = "./.databend/logs_1" diff --git a/src/meta/types/src/user_stage.rs b/src/meta/types/src/user_stage.rs index f864f6733b8a6..da5e26a071730 100644 --- a/src/meta/types/src/user_stage.rs +++ b/src/meta/types/src/user_stage.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::fmt; use std::str::FromStr; use chrono::DateTime; use chrono::Utc; +use common_exception::ErrorCode; +use common_exception::Result; use common_io::consts::NAN_BYTES_SNAKE; use common_storage::StorageParams; @@ -338,6 +341,88 @@ impl UserStageInfo { StageType::User => format!("/stage/user/{}/", self.stage_name), } } + + /// Apply the file format options. + pub fn apply_format_options(&mut self, opts: &BTreeMap) -> Result<()> { + if opts.is_empty() { + return Ok(()); + } + for (k, v) in opts.iter() { + match k.as_str() { + "format" => { + let format = StageFileFormatType::from_str(v)?; + self.file_format_options.format = format; + } + "skip_header" => { + let skip_header = u64::from_str(v)?; + self.file_format_options.skip_header = skip_header; + } + "field_delimiter" => self.file_format_options.field_delimiter = v.clone(), + "record_delimiter" => self.file_format_options.record_delimiter = v.clone(), + "nan_display" => self.file_format_options.nan_display = v.clone(), + "escape" => self.file_format_options.escape = v.clone(), + "compression" => { + let compression = StageFileCompression::from_str(v)?; + self.file_format_options.compression = compression; + } + "row_tag" => self.file_format_options.row_tag = v.clone(), + "quote" => self.file_format_options.quote = v.clone(), + _ => { + return Err(ErrorCode::BadArguments(format!( + "Unknown stage file format option {}", + k + ))); + } + } + } + Ok(()) + } + + /// Apply the copy options. + pub fn apply_copy_options(&mut self, opts: &BTreeMap) -> Result<()> { + if opts.is_empty() { + return Ok(()); + } + for (k, v) in opts.iter() { + match k.as_str() { + "on_error" => { + let on_error = OnErrorMode::from_str(v)?; + self.copy_options.on_error = on_error; + } + "size_limit" => { + let size_limit = usize::from_str(v)?; + self.copy_options.size_limit = size_limit; + } + "split_size" => { + let split_size = usize::from_str(v)?; + self.copy_options.split_size = split_size; + } + "purge" => { + let purge = bool::from_str(v).map_err(|_| { + ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v)) + })?; + self.copy_options.purge = purge; + } + "single" => { + let single = bool::from_str(v).map_err(|_| { + ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v)) + })?; + self.copy_options.single = single; + } + "max_file_size" => { + let max_file_size = usize::from_str(v)?; + self.copy_options.max_file_size = max_file_size; + } + _ => { + return Err(ErrorCode::BadArguments(format!( + "Unknown stage copy option {}", + k + ))); + } + } + } + Ok(()) + } } #[derive(Default, Debug, Clone, PartialEq, Eq)] diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index a77489f494804..160d11fc47375 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::net::SocketAddr; use std::sync::atomic::AtomicBool; use std::sync::Arc; @@ -54,6 +55,13 @@ pub struct ProcessInfo { pub created_time: SystemTime, } +#[derive(Debug, Clone)] +pub struct StageAttachment { + pub location: String, + pub format_options: BTreeMap, + pub copy_options: BTreeMap, +} + #[async_trait::async_trait] pub trait TableContext: Send + Sync { /// Build a table instance the plan wants to operate on. @@ -99,4 +107,5 @@ pub trait TableContext: Send + Sync { async fn get_table(&self, catalog: &str, database: &str, table: &str) -> Result>; fn get_processes_info(&self) -> Vec; + fn get_stage_attachment(&self) -> Option; } diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index c45f1c5bdb17d..0e75ca97aa2cc 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -17,6 +17,7 @@ use std::io::BufRead; use std::io::Cursor; use std::ops::Not; use std::sync::Arc; +use std::time::Instant; use aho_corasick::AhoCorasick; use common_ast::ast::Expr; @@ -24,7 +25,9 @@ use common_ast::parser::parse_comma_separated_exprs; use common_ast::parser::tokenize_sql; use common_ast::Backtrace; use common_base::runtime::GlobalIORuntime; +use common_catalog::plan::StageTableInfo; use common_catalog::table::AppendMode; +use common_catalog::table_context::StageAttachment; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; @@ -33,19 +36,26 @@ use common_formats::parse_timezone; use common_formats::FastFieldDecoderValues; use common_io::cursor_ext::ReadBytesExt; use common_io::cursor_ext::ReadCheckPointExt; +use common_meta_types::UserStageInfo; +use common_pipeline_core::Pipeline; use common_pipeline_sources::processors::sources::AsyncSource; use common_pipeline_sources::processors::sources::AsyncSourcer; use common_pipeline_transforms::processors::transforms::Transform; use common_sql::evaluator::ChunkOperator; use common_sql::evaluator::CompoundChunkOperator; +use common_sql::executor::table_read_plan::ToReadDataSourcePlan; use common_sql::Metadata; use common_sql::MetadataRef; +use common_storages_factory::Table; +use common_storages_stage::StageTable; +use common_users::UserApiProvider; use parking_lot::Mutex; use parking_lot::RwLock; use crate::interpreters::common::append2table; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; +use crate::pipelines::processors::TransformAddOn; use crate::pipelines::PipelineBuildResult; use crate::pipelines::SourcePipeBuilder; use crate::schedulers::build_query_pipeline; @@ -101,6 +111,100 @@ impl InsertInterpreterV2 { let cast_needed = select_schema != *output_schema; Ok(cast_needed) } + + async fn build_insert_from_stage_pipeline( + &self, + table: Arc, + attachment: Arc, + pipeline: &mut Pipeline, + ) -> Result<()> { + let start = Instant::now(); + let ctx = self.ctx.clone(); + let table_ctx: Arc = ctx.clone(); + let source_schema = self.plan.schema(); + let target_schema = table.schema(); + let catalog_name = self.plan.catalog.clone(); + let overwrite = self.plan.overwrite; + + let (mut stage_info, path) = parse_stage_location(&self.ctx, &attachment.location).await?; + stage_info.apply_format_options(&attachment.format_options)?; + stage_info.apply_copy_options(&attachment.copy_options)?; + + let mut stage_table_info = StageTableInfo { + schema: source_schema.clone(), + user_stage_info: stage_info, + path: path.to_string(), + files: vec![], + pattern: "".to_string(), + files_to_copy: None, + }; + + let all_source_file_infos = StageTable::list_files(&table_ctx, &stage_table_info).await?; + + tracing::info!( + "insert: read all stage attachment files finished: {}, elapsed:{}", + all_source_file_infos.len(), + start.elapsed().as_secs() + ); + + stage_table_info.files_to_copy = Some(all_source_file_infos.clone()); + let stage_table = StageTable::try_create(stage_table_info.clone())?; + let read_source_plan = { + stage_table + .read_plan_with_catalog(ctx.clone(), catalog_name, None) + .await? + }; + + stage_table.read_data(table_ctx, &read_source_plan, pipeline)?; + + let need_fill_missing_columns = target_schema != source_schema; + if need_fill_missing_columns { + pipeline.add_transform(|transform_input_port, transform_output_port| { + TransformAddOn::try_create( + transform_input_port, + transform_output_port, + source_schema.clone(), + target_schema.clone(), + ctx.clone(), + ) + })?; + } + table.append_data(ctx.clone(), pipeline, AppendMode::Copy, false)?; + + pipeline.set_on_finished(move |may_error| { + // capture out variable + let overwrite = overwrite; + let ctx = ctx.clone(); + let table = table.clone(); + + match may_error { + Some(error) => { + tracing::error!("insert stage file error: {}", error); + Err(may_error.as_ref().unwrap().clone()) + } + None => { + let append_entries = ctx.consume_precommit_blocks(); + // We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower + GlobalIORuntime::instance().block_on(async move { + tracing::info!( + "insert: try to commit append entries:{}, elapsed:{}", + append_entries.len(), + start.elapsed().as_secs() + ); + table + .commit_insertion(ctx, append_entries, overwrite) + .await?; + + // TODO:(everpcpc) purge copied files + + Ok(()) + }) + } + } + }); + + Ok(()) + } } #[async_trait::async_trait] @@ -156,6 +260,16 @@ impl Interpreter for InsertInterpreterV2 { .format .exec_stream(input_context.clone(), &mut build_res.main_pipeline)?; } + InsertInputSource::Stage(opts) => { + tracing::info!("insert: from stage with options {:?}", opts); + self.build_insert_from_stage_pipeline( + table.clone(), + opts.clone(), + &mut build_res.main_pipeline, + ) + .await?; + return Ok(build_res); + } InsertInputSource::SelectPlan(plan) => { let table1 = table.clone(); let (mut select_plan, select_column_bindings) = match plan.as_ref() { @@ -602,3 +716,26 @@ async fn exprs_to_datavalue<'a>( let datavalues: Vec = res.columns().iter().skip(1).map(|col| col.get(0)).collect(); Ok(datavalues) } + +// TODO:(everpcpc) tmp copy from src/query/sql/src/planner/binder/copy.rs +// move to user stage module +async fn parse_stage_location( + ctx: &Arc, + location: &str, +) -> Result<(UserStageInfo, String)> { + let s: Vec<&str> = location.split('@').collect(); + // @my_ext_stage/abc/ + let names: Vec<&str> = s[1].splitn(2, '/').filter(|v| !v.is_empty()).collect(); + + let stage = if names[0] == "~" { + UserStageInfo::new_user_stage(&ctx.get_current_user()?.name) + } else { + UserApiProvider::instance() + .get_stage(&ctx.get_tenant(), names[0]) + .await? + }; + + let path = names.get(1).unwrap_or(&"").trim_start_matches('/'); + + Ok((stage, path.to_string())) +} diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 84cb50907a846..97805c9772671 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -21,6 +21,7 @@ use common_base::base::tokio; use common_base::base::tokio::sync::Mutex as TokioMutex; use common_base::base::tokio::sync::RwLock; use common_base::runtime::TrySpawn; +use common_catalog::table_context::StageAttachment; use common_exception::ErrorCode; use common_exception::Result; use serde::Deserialize; @@ -59,6 +60,7 @@ pub struct HttpQueryRequest { pub pagination: PaginationConf, #[serde(default = "default_as_true")] pub string_fields: bool, + pub stage_attachment: Option, } const DEFAULT_MAX_ROWS_IN_BUFFER: usize = 5 * 1000 * 1000; @@ -141,6 +143,15 @@ impl HttpSessionConf { } } +#[derive(Deserialize, Debug, Clone)] +pub struct StageAttachmentConf { + /// location of the stage + /// for example: @stage_name/path/to/file, @~/path/to/file + pub(crate) location: String, + pub(crate) format_options: Option>, + pub(crate) copy_options: Option>, +} + #[derive(Debug, Clone)] pub struct ResponseState { pub running_time_ms: f64, @@ -229,6 +240,21 @@ impl HttpQuery { let sql = &request.sql; tracing::info!("run query_id={id} in session_id={session_id}, sql='{sql}'"); + match &request.stage_attachment { + Some(attachment) => ctx.attach_stage(StageAttachment { + location: attachment.location.clone(), + format_options: match attachment.format_options { + Some(ref params) => params.clone(), + None => BTreeMap::new(), + }, + copy_options: match attachment.copy_options { + Some(ref params) => params.clone(), + None => BTreeMap::new(), + }, + }), + None => {} + }; + let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer); let start_time = Instant::now(); let state = Arc::new(RwLock::new(Executor { diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 2fbafa1378a28..5f26516d594cc 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -32,6 +32,7 @@ use common_catalog::plan::DataSourcePlan; use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Partitions; use common_catalog::plan::StageTableInfo; +use common_catalog::table_context::StageAttachment; use common_config::DATABEND_COMMIT_VERSION; use common_datablocks::DataBlock; use common_datavalues::DataValue; @@ -192,6 +193,10 @@ impl QueryContext { self.shared.set_executor(weak_ptr) } + pub fn attach_stage(&self, attachment: StageAttachment) { + self.shared.attach_stage(attachment); + } + pub fn get_created_time(&self) -> SystemTime { self.shared.created_time } @@ -354,6 +359,11 @@ impl TableContext for QueryContext { fn get_processes_info(&self) -> Vec { SessionManager::instance().processes_info() } + + // Get Stage Attachment. + fn get_stage_attachment(&self) -> Option { + self.shared.get_stage_attachment() + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 2b15b0db2befd..a66a0909e55b8 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -22,6 +22,7 @@ use std::time::SystemTime; use common_base::base::Progress; use common_base::runtime::Runtime; +use common_catalog::table_context::StageAttachment; use common_config::Config; use common_datablocks::DataBlock; use common_exception::ErrorCode; @@ -78,6 +79,7 @@ pub struct QueryContextShared { pub(in crate::sessions) data_operator: DataOperator, pub(in crate::sessions) executor: Arc>>, pub(in crate::sessions) precommit_blocks: Arc>>, + pub(in crate::sessions) stage_attachment: Arc>>, pub(in crate::sessions) created_time: SystemTime, } @@ -107,6 +109,7 @@ impl QueryContextShared { affect: Arc::new(Mutex::new(None)), executor: Arc::new(RwLock::new(Weak::new())), precommit_blocks: Arc::new(RwLock::new(vec![])), + stage_attachment: Arc::new(RwLock::new(None)), created_time: SystemTime::now(), })) } @@ -316,6 +319,15 @@ impl QueryContextShared { swaped_precommit_blocks } + pub fn get_stage_attachment(&self) -> Option { + self.stage_attachment.read().clone() + } + + pub fn attach_stage(&self, attachment: StageAttachment) { + let mut stage_attachment = self.stage_attachment.write(); + *stage_attachment = Some(attachment); + } + pub fn get_created_time(&self) -> SystemTime { self.created_time } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 3cb2ab4cb7e94..844de4bc82df3 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -27,6 +27,7 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Partitions; use common_catalog::table::Table; use common_catalog::table_context::ProcessInfo; +use common_catalog::table_context::StageAttachment; use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_exception::ErrorCode; @@ -453,6 +454,10 @@ impl TableContext for CtxDelegation { fn get_processes_info(&self) -> Vec { todo!() } + + fn get_stage_attachment(&self) -> Option { + todo!() + } } #[derive(Clone)] diff --git a/src/query/sql/src/planner/binder/insert.rs b/src/query/sql/src/planner/binder/insert.rs index e1d95d1052823..a6beccc7e744c 100644 --- a/src/query/sql/src/planner/binder/insert.rs +++ b/src/query/sql/src/planner/binder/insert.rs @@ -96,10 +96,13 @@ impl<'a> Binder { opts, start, None, )) } - InsertSource::Values { rest_str } => { - let data = rest_str.trim_end_matches(';').trim_start().to_owned(); - Ok(InsertInputSource::Values(data)) - } + InsertSource::Values { rest_str } => match self.ctx.get_stage_attachment() { + Some(attachment) => Ok(InsertInputSource::Stage(Arc::new(attachment))), + None => { + let data = rest_str.trim_end_matches(';').trim_start().to_owned(); + Ok(InsertInputSource::Values(data)) + } + }, InsertSource::Select { query } => { let statement = Statement::Query(query); let select_plan = self.bind_statement(bind_context, &statement).await?; diff --git a/src/query/sql/src/planner/plans/insert.rs b/src/query/sql/src/planner/plans/insert.rs index df63b2cb4658d..e0aadc873cdc7 100644 --- a/src/query/sql/src/planner/plans/insert.rs +++ b/src/query/sql/src/planner/plans/insert.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_catalog::table_context::StageAttachment; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_meta_types::FileFormatOptions; @@ -31,6 +32,8 @@ pub enum InsertInputSource { StreamingWithFileFormat(FileFormatOptions, usize, Option>), // From cloned String and format Values(String), + // From stage + Stage(Arc), } #[derive(Clone)]