diff --git a/src/query/pipeline/sources/src/processors/sources/mod.rs b/src/query/pipeline/sources/src/processors/sources/mod.rs index 4c20fb571d7bb..d8283bf320b34 100644 --- a/src/query/pipeline/sources/src/processors/sources/mod.rs +++ b/src/query/pipeline/sources/src/processors/sources/mod.rs @@ -20,7 +20,6 @@ pub mod file_splitter; pub mod multi_file_splitter; mod one_block_source; pub mod stream_source; -pub mod stream_source_v2; pub mod sync_source; pub mod sync_source_receiver; @@ -37,7 +36,6 @@ pub use multi_file_splitter::OperatorInfo; pub use one_block_source::OneBlockSource; pub use stream_source::StreamSource; pub use stream_source::StreamSourceNoSkipEmpty; -pub use stream_source_v2::StreamSourceV2; pub use sync_source::SyncSource; pub use sync_source::SyncSourcer; pub use sync_source::*; diff --git a/src/query/pipeline/sources/src/processors/sources/stream_source_v2.rs b/src/query/pipeline/sources/src/processors/sources/stream_source_v2.rs deleted file mode 100644 index 1c411d451d0cc..0000000000000 --- a/src/query/pipeline/sources/src/processors/sources/stream_source_v2.rs +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_catalog::table_context::TableContext; -use common_datablocks::DataBlock; -use common_exception::Result; -use common_pipeline_core::processors::port::OutputPort; -use common_pipeline_core::processors::processor::ProcessorPtr; -use common_streams::Source; - -use crate::processors::sources::AsyncSource; -use crate::processors::sources::AsyncSourcer; - -pub struct StreamSourceV2 { - s: Box, -} - -impl StreamSourceV2 { - pub fn create( - ctx: Arc, - s: Box, - out: Arc, - ) -> Result { - AsyncSourcer::create(ctx, out, StreamSourceV2 { s }) - } -} - -#[async_trait::async_trait] -impl AsyncSource for StreamSourceV2 { - const NAME: &'static str = "stream source"; - - #[async_trait::unboxed_simple] - async fn generate(&mut self) -> Result> { - self.s.read().await - } -} diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index e93e95579628f..20a518781c6bc 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -12,32 +12,63 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::VecDeque; +use std::borrow::Cow; +use std::io::Cursor; +use std::ops::Not; use std::sync::Arc; +use common_ast::ast::Expr; +use common_ast::parser::parse_comma_separated_exprs; +use common_ast::parser::tokenize_sql; +use common_ast::Backtrace; use common_base::base::GlobalIORuntime; use common_base::base::TrySpawn; +use common_datablocks::DataBlock; +use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::FormatFactory; +use common_formats::InputFormat; +use common_io::prelude::BufferRead; +use common_io::prelude::BufferReadExt; +use common_io::prelude::BufferReader; +use common_io::prelude::FileSplitCow; +use common_io::prelude::NestedCheckpointReader; +use common_pipeline_sources::processors::sources::AsyncSource; +use common_pipeline_sources::processors::sources::AsyncSourcer; +use common_pipeline_sources::processors::sources::SyncSource; +use common_pipeline_sources::processors::sources::SyncSourcer; +use common_pipeline_transforms::processors::transforms::Transform; use parking_lot::Mutex; +use parking_lot::RwLock; use super::interpreter_common::append2table; use super::plan_schedulers::build_schedule_pipepline; +use crate::evaluator::EvalNode; +use crate::evaluator::Evaluator; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; use crate::pipelines::processors::port::OutputPort; -use crate::pipelines::processors::BlocksSource; +use crate::pipelines::processors::transforms::ExpressionTransformV2; use crate::pipelines::PipelineBuildResult; use crate::pipelines::SourcePipeBuilder; use crate::sessions::QueryContext; use crate::sessions::TableContext; +use crate::sql::binder::ScalarBinder; use crate::sql::executor::DistributedInsertSelect; use crate::sql::executor::PhysicalPlan; use crate::sql::executor::PhysicalPlanBuilder; use crate::sql::executor::PipelineBuilder; +use crate::sql::plans::CastExpr; +use crate::sql::plans::ConstantExpr; use crate::sql::plans::Insert; use crate::sql::plans::InsertInputSource; use crate::sql::plans::Plan; +use crate::sql::plans::Scalar; +use crate::sql::BindContext; +use crate::sql::Metadata; +use crate::sql::MetadataRef; +use crate::sql::NameResolutionContext; pub struct InsertInterpreterV2 { ctx: Arc, @@ -85,7 +116,6 @@ impl Interpreter for InsertInterpreterV2 { async fn execute2(&self) -> Result { let plan = &self.plan; - let settings = self.ctx.get_settings(); let table = self .ctx .get_table(&plan.catalog, &plan.database, &plan.table) @@ -102,16 +132,42 @@ impl Interpreter for InsertInterpreterV2 { ); } else { match &self.plan.source { - InsertInputSource::Values(values) => { - let blocks = - Arc::new(Mutex::new(VecDeque::from_iter(vec![values.block.clone()]))); - - for _index in 0..settings.get_max_threads()? { - let output = OutputPort::create(); - builder.add_source( - output.clone(), - BlocksSource::create(self.ctx.clone(), output.clone(), blocks.clone())?, - ); + InsertInputSource::StrWithFormat((str, format)) => { + let output_port = OutputPort::create(); + let format_settings = self.ctx.get_format_settings()?; + match format.as_str() { + "VALUES" => { + let settings = self.ctx.get_settings(); + let name_resolution_ctx = + NameResolutionContext::try_from(settings.as_ref())?; + let inner = ValueSource::new( + str.to_string(), + self.ctx.clone(), + name_resolution_ctx, + plan.schema(), + ); + let source = + AsyncSourcer::create(self.ctx.clone(), output_port.clone(), inner)?; + builder.add_source(output_port, source); + } + + _ => { + let input_format = FormatFactory::instance().get_input( + format.as_str(), + plan.schema(), + format_settings, + )?; + let inner = FormatSource { + data: str.to_string(), + input_format, + blocks: vec![], + is_finished: false, + }; + + let source = + SyncSourcer::create(self.ctx.clone(), output_port.clone(), inner)?; + builder.add_source(output_port, source); + } } build_res.main_pipeline.add_pipe(builder.finalize()); } @@ -238,3 +294,374 @@ impl Interpreter for InsertInterpreterV2 { Ok(()) } } + +struct FormatSource { + data: String, + input_format: Arc, + blocks: Vec, + is_finished: bool, +} + +impl SyncSource for FormatSource { + const NAME: &'static str = "FormatSource"; + + fn generate(&mut self) -> Result> { + if self.is_finished { + return Ok(None); + } + + if self.blocks.is_empty() { + let data_slice = self.data.as_bytes(); + let mut input_state = self.input_format.create_state(); + let skip_size = self + .input_format + .skip_header(data_slice, &mut input_state, 0)?; + + let split = FileSplitCow { + path: None, + start_offset: 0, + start_row: 0, + buf: Cow::from(&data_slice[skip_size..]), + }; + self.blocks = self.input_format.deserialize_complete_split(split)?; + } + + if !self.blocks.is_empty() { + let block = self.blocks.remove(0); + if self.blocks.is_empty() { + self.is_finished = true; + } + return Ok(Some(block)); + } + Ok(None) + } +} + +pub struct ValueSource { + data: String, + ctx: Arc, + name_resolution_ctx: NameResolutionContext, + bind_context: BindContext, + schema: DataSchemaRef, + metadata: MetadataRef, + is_finished: bool, +} + +#[async_trait::async_trait] +impl AsyncSource for ValueSource { + const NAME: &'static str = "ValueSource"; + const SKIP_EMPTY_DATA_BLOCK: bool = true; + + async fn generate(&mut self) -> Result> { + if self.is_finished { + return Ok(None); + } + let cursor = Cursor::new(self.data.as_bytes()); + let mut reader = NestedCheckpointReader::new(BufferReader::new(cursor)); + let block = self.read(&mut reader).await?; + self.is_finished = true; + Ok(Some(block)) + } +} + +impl ValueSource { + pub fn new( + data: String, + ctx: Arc, + name_resolution_ctx: NameResolutionContext, + schema: DataSchemaRef, + ) -> Self { + let bind_context = BindContext::new(); + let metadata = Arc::new(RwLock::new(Metadata::create())); + + Self { + data, + ctx, + name_resolution_ctx, + schema, + bind_context, + metadata, + is_finished: false, + } + } + + pub async fn read( + &self, + reader: &mut NestedCheckpointReader, + ) -> Result { + let mut desers = self + .schema + .fields() + .iter() + .map(|f| f.data_type().create_deserializer(1024)) + .collect::>(); + + let col_size = desers.len(); + let mut rows = 0; + + loop { + let _ = reader.ignore_white_spaces()?; + if !reader.has_data_left()? { + break; + } + // Not the first row + if rows != 0 { + reader.must_ignore_byte(b',')?; + } + + self.parse_next_row( + reader, + col_size, + &mut desers, + &self.bind_context, + self.metadata.clone(), + ) + .await?; + rows += 1; + } + + if rows == 0 { + return Ok(DataBlock::empty_with_schema(self.schema.clone())); + } + + let columns = desers + .iter_mut() + .map(|deser| deser.finish_to_column()) + .collect::>(); + + Ok(DataBlock::create(self.schema.clone(), columns)) + } + + /// Parse single row value, like ('111', 222, 1 + 1) + async fn parse_next_row( + &self, + reader: &mut NestedCheckpointReader, + col_size: usize, + desers: &mut [TypeDeserializerImpl], + bind_context: &BindContext, + metadata: MetadataRef, + ) -> Result<()> { + let _ = reader.ignore_white_spaces()?; + reader.push_checkpoint(); + + // Start of the row --- '(' + if !reader.ignore_byte(b'(')? { + return Err(ErrorCode::BadDataValueType( + "Must start with parentheses".to_string(), + )); + } + + let format = self.ctx.get_format_settings()?; + for col_idx in 0..col_size { + let _ = reader.ignore_white_spaces()?; + let col_end = if col_idx + 1 == col_size { b')' } else { b',' }; + + let deser = desers + .get_mut(col_idx) + .ok_or_else(|| ErrorCode::BadBytes("Deserializer is None"))?; + + let (need_fallback, pop_count) = deser + .de_text_quoted(reader, &format) + .and_then(|_| { + let _ = reader.ignore_white_spaces()?; + let need_fallback = reader.ignore_byte(col_end)?.not(); + Ok((need_fallback, col_idx + 1)) + }) + .unwrap_or((true, col_idx)); + + // Deserializer and expr-parser both will eat the end ')' of the row. + if need_fallback { + for deser in desers.iter_mut().take(pop_count) { + deser.pop_data_value()?; + } + skip_to_next_row(reader, 1)?; + + // Parse from expression and append all columns. + let buf = reader.get_checkpoint_buffer(); + + let sql = std::str::from_utf8(buf).unwrap(); + let settings = self.ctx.get_settings(); + let sql_dialect = settings.get_sql_dialect()?; + let tokens = tokenize_sql(sql)?; + let backtrace = Backtrace::new(); + let exprs = parse_comma_separated_exprs( + &tokens[1..tokens.len() as usize], + sql_dialect, + &backtrace, + )?; + + let values = exprs_to_datavalue( + exprs, + &self.schema, + self.ctx.clone(), + &self.name_resolution_ctx, + bind_context, + metadata, + ) + .await?; + + reader.pop_checkpoint(); + + for (append_idx, deser) in desers.iter_mut().enumerate().take(col_size) { + deser.append_data_value(values[append_idx].clone(), &format)?; + } + + return Ok(()); + } + } + + reader.pop_checkpoint(); + Ok(()) + } +} + +// Values |(xxx), (yyy), (zzz) +pub fn skip_to_next_row( + reader: &mut NestedCheckpointReader, + mut balance: i32, +) -> Result<()> { + let _ = reader.ignore_white_spaces()?; + + let mut quoted = false; + let mut escaped = false; + + while balance > 0 { + let buffer = reader.fill_buf()?; + if buffer.is_empty() { + break; + } + + let size = buffer.len(); + + let it = buffer + .iter() + .position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\''); + + if let Some(it) = it { + let c = buffer[it]; + reader.consume(it + 1); + + if it == 0 && escaped { + escaped = false; + continue; + } + escaped = false; + + match c { + b'\\' => { + escaped = true; + continue; + } + b'\'' => { + quoted ^= true; + continue; + } + b')' => { + if !quoted { + balance -= 1; + } + } + b'(' => { + if !quoted { + balance += 1; + } + } + _ => {} + } + } else { + escaped = false; + reader.consume(size); + } + } + Ok(()) +} + +fn fill_default_value(expressions: &mut Vec<(EvalNode, String)>, field: &DataField) -> Result<()> { + if let Some(default_expr) = field.default_expr() { + expressions.push(( + Evaluator::eval_physical_scalar(&serde_json::from_str(default_expr)?)?, + field.name().to_string(), + )); + } else { + // If field data type is nullable, then we'll fill it with null. + if field.data_type().is_nullable() { + let scalar = Scalar::ConstantExpr(ConstantExpr { + value: DataValue::Null, + data_type: Box::new(field.data_type().clone()), + }); + expressions.push((Evaluator::eval_scalar(&scalar)?, field.name().to_string())); + } else { + expressions.push(( + Evaluator::eval_scalar(&Scalar::ConstantExpr(ConstantExpr { + value: field.data_type().default_value(), + data_type: Box::new(field.data_type().clone()), + }))?, + field.name().to_string(), + )); + } + } + Ok(()) +} + +async fn exprs_to_datavalue<'a>( + exprs: Vec>, + schema: &DataSchemaRef, + ctx: Arc, + name_resolution_ctx: &NameResolutionContext, + bind_context: &BindContext, + metadata: MetadataRef, +) -> Result> { + let schema_fields_len = schema.fields().len(); + if exprs.len() > schema_fields_len { + return Err(ErrorCode::LogicalError( + "Column count shouldn't be more than the number of schema", + )); + } + if exprs.len() < schema_fields_len { + return Err(ErrorCode::LogicalError( + "Column count doesn't match value count", + )); + } + let mut expressions = Vec::with_capacity(schema_fields_len); + for (i, expr) in exprs.iter().enumerate() { + // `DEFAULT` in insert values will be parsed as `Expr::ColumnRef`. + if let Expr::ColumnRef { column, .. } = expr { + if column.name.eq_ignore_ascii_case("default") { + let field = schema.field(i); + fill_default_value(&mut expressions, field)?; + continue; + } + } + let mut scalar_binder = ScalarBinder::new( + bind_context, + ctx.clone(), + name_resolution_ctx, + metadata.clone(), + &[], + ); + let (mut scalar, data_type) = scalar_binder.bind(expr).await?; + let field_data_type = schema.field(i).data_type(); + if data_type.ne(field_data_type) { + scalar = Scalar::CastExpr(CastExpr { + argument: Box::new(scalar), + from_type: Box::new(data_type), + target_type: Box::new(field_data_type.clone()), + }) + } + expressions.push(( + Evaluator::eval_scalar(&scalar)?, + schema.field(i).name().to_string(), + )); + } + + let dummy = DataSchemaRefExt::create(vec![DataField::new("dummy", u8::to_data_type())]); + let one_row_block = DataBlock::create(dummy, vec![Series::from_data(vec![1u8])]); + let func_ctx = ctx.try_get_function_context()?; + let mut expression_transform = ExpressionTransformV2 { + expressions, + func_ctx, + }; + let res = expression_transform.transform(one_row_block)?; + let datavalues: Vec = res.columns().iter().skip(1).map(|col| col.get(0)).collect(); + Ok(datavalues) +} diff --git a/src/query/service/src/pipelines/processors/mod.rs b/src/query/service/src/pipelines/processors/mod.rs index 495cb7c51f892..1e48227c03956 100644 --- a/src/query/service/src/pipelines/processors/mod.rs +++ b/src/query/service/src/pipelines/processors/mod.rs @@ -35,7 +35,6 @@ pub use sources::FileSplitterState; pub use sources::MultiFileSplitter; pub use sources::OperatorInfo; pub use sources::StreamSource; -pub use sources::StreamSourceV2; pub use sources::SyncSource; pub use sources::SyncSourcer; pub use transforms::AggregatorParams; diff --git a/src/query/service/src/sql/planner/binder/insert.rs b/src/query/service/src/sql/planner/binder/insert.rs index d328172a65444..9818fe0dcbf4c 100644 --- a/src/query/service/src/sql/planner/binder/insert.rs +++ b/src/query/service/src/sql/planner/binder/insert.rs @@ -12,54 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; -use std::io::Cursor; -use std::ops::Not; use std::sync::Arc; -use common_ast::ast::Expr; use common_ast::ast::InsertSource; use common_ast::ast::InsertStmt; use common_ast::ast::Statement; -use common_ast::parser::parse_comma_separated_exprs; -use common_ast::parser::tokenize_sql; -use common_ast::Backtrace; -use common_datablocks::DataBlock; -use common_datavalues::prelude::*; -use common_datavalues::DataSchemaRef; use common_datavalues::DataSchemaRefExt; -use common_datavalues::DataType; -use common_datavalues::TypeDeserializer; -use common_datavalues::TypeDeserializerImpl; -use common_exception::ErrorCode; use common_exception::Result; -use common_formats::FormatFactory; -use common_io::prelude::BufferReader; -use common_io::prelude::*; -use common_pipeline_transforms::processors::transforms::Transform; use tracing::debug; use crate::clusters::ClusterHelper; -use crate::evaluator::EvalNode; -use crate::evaluator::Evaluator; -use crate::pipelines::processors::transforms::ExpressionTransformV2; -use crate::sessions::TableContext; use crate::sql::binder::Binder; -use crate::sql::binder::ScalarBinder; use crate::sql::normalize_identifier; use crate::sql::optimizer::optimize; use crate::sql::optimizer::OptimizerConfig; use crate::sql::optimizer::OptimizerContext; -use crate::sql::planner::semantic::NameResolutionContext; -use crate::sql::plans::CastExpr; -use crate::sql::plans::ConstantExpr; use crate::sql::plans::Insert; use crate::sql::plans::InsertInputSource; -use crate::sql::plans::InsertValueBlock; use crate::sql::plans::Plan; -use crate::sql::plans::Scalar; use crate::sql::BindContext; -use crate::sql::MetadataRef; impl<'a> Binder { pub(in crate::sql::planner::binder) async fn bind_insert( @@ -109,18 +80,12 @@ impl<'a> Binder { let input_source: Result = match source.clone() { InsertSource::Streaming { format, rest_str } => { - self.analyze_stream_format(bind_context, rest_str, Some(format), schema.clone()) - .await + self.analyze_stream_format(rest_str, Some(format)).await } InsertSource::Values { rest_str, .. } => { let str = rest_str.trim_end_matches(';'); - self.analyze_stream_format( - bind_context, - str, - Some("VALUES".to_string()), - schema.clone(), - ) - .await + self.analyze_stream_format(str, Some("VALUES".to_string())) + .await } InsertSource::Select { query } => { let statement = Statement::Query(query); @@ -148,356 +113,17 @@ impl<'a> Binder { pub(in crate::sql::planner::binder) async fn analyze_stream_format( &self, - bind_context: &BindContext, stream_str: &'a str, format: Option, - schema: DataSchemaRef, ) -> Result { let stream_str = stream_str.trim_start(); debug!("{:?}", stream_str); - let settings = self.ctx.get_format_settings()?; - // TODO migrate format into format factory let format = format.map(|v| v.to_uppercase()); - match format.as_deref() { - Some("VALUES") | None => { - let bytes = stream_str.as_bytes(); - let cursor = Cursor::new(bytes); - let mut reader = NestedCheckpointReader::new(BufferReader::new(cursor)); - let source = ValueSourceV2::new( - self.ctx.clone(), - &self.name_resolution_ctx, - bind_context, - schema, - self.metadata.clone(), - ); - let block = source.read(&mut reader).await?; - Ok(InsertInputSource::Values(InsertValueBlock { block })) - } - // format factory - Some(name) => { - let input_format = FormatFactory::instance().get_input(name, schema, settings)?; - - let data_slice = stream_str.as_bytes(); - let mut input_state = input_format.create_state(); - let skip_size = input_format.skip_header(data_slice, &mut input_state, 0)?; - - let split = FileSplitCow { - path: None, - start_offset: 0, - start_row: 0, - buf: Cow::from(&data_slice[skip_size..]), - }; - - let blocks = input_format.deserialize_complete_split(split)?; - let block = DataBlock::concat_blocks(&blocks)?; - Ok(InsertInputSource::Values(InsertValueBlock { block })) - } - } - } -} - -pub struct ValueSourceV2<'a> { - ctx: Arc, - name_resolution_ctx: &'a NameResolutionContext, - bind_context: &'a BindContext, - schema: DataSchemaRef, - metadata: MetadataRef, -} - -impl<'a> ValueSourceV2<'a> { - pub fn new( - ctx: Arc, - name_resolution_ctx: &'a NameResolutionContext, - bind_context: &'a BindContext, - schema: DataSchemaRef, - metadata: MetadataRef, - ) -> Self { - Self { - ctx, - name_resolution_ctx, - schema, - bind_context, - metadata, - } + // TODO migrate format into format factory and avoid the str clone + let data = stream_str.to_owned(); + Ok(InsertInputSource::StrWithFormat(( + data, + format.unwrap_or_else(|| "VALUES".to_string()), + ))) } - - pub async fn read( - &self, - reader: &mut NestedCheckpointReader, - ) -> Result { - let mut desers = self - .schema - .fields() - .iter() - .map(|f| f.data_type().create_deserializer(1024)) - .collect::>(); - - let col_size = desers.len(); - let mut rows = 0; - - loop { - let _ = reader.ignore_white_spaces()?; - if !reader.has_data_left()? { - break; - } - // Not the first row - if rows != 0 { - reader.must_ignore_byte(b',')?; - } - - self.parse_next_row( - reader, - col_size, - &mut desers, - self.bind_context, - self.metadata.clone(), - ) - .await?; - rows += 1; - } - - if rows == 0 { - return Ok(DataBlock::empty_with_schema(self.schema.clone())); - } - - let columns = desers - .iter_mut() - .map(|deser| deser.finish_to_column()) - .collect::>(); - - Ok(DataBlock::create(self.schema.clone(), columns)) - } - - /// Parse single row value, like ('111', 222, 1 + 1) - async fn parse_next_row( - &self, - reader: &mut NestedCheckpointReader, - col_size: usize, - desers: &mut [TypeDeserializerImpl], - bind_context: &BindContext, - metadata: MetadataRef, - ) -> Result<()> { - let _ = reader.ignore_white_spaces()?; - reader.push_checkpoint(); - - // Start of the row --- '(' - if !reader.ignore_byte(b'(')? { - return Err(ErrorCode::BadDataValueType( - "Must start with parentheses".to_string(), - )); - } - - let format = self.ctx.get_format_settings()?; - for col_idx in 0..col_size { - let _ = reader.ignore_white_spaces()?; - let col_end = if col_idx + 1 == col_size { b')' } else { b',' }; - - let deser = desers - .get_mut(col_idx) - .ok_or_else(|| ErrorCode::BadBytes("Deserializer is None"))?; - - let (need_fallback, pop_count) = deser - .de_text_quoted(reader, &format) - .and_then(|_| { - let _ = reader.ignore_white_spaces()?; - let need_fallback = reader.ignore_byte(col_end)?.not(); - Ok((need_fallback, col_idx + 1)) - }) - .unwrap_or((true, col_idx)); - - // Deserializer and expr-parser both will eat the end ')' of the row. - if need_fallback { - for deser in desers.iter_mut().take(pop_count) { - deser.pop_data_value()?; - } - skip_to_next_row(reader, 1)?; - - // Parse from expression and append all columns. - let buf = reader.get_checkpoint_buffer(); - - let sql = std::str::from_utf8(buf).unwrap(); - let settings = self.ctx.get_settings(); - let sql_dialect = settings.get_sql_dialect()?; - let tokens = tokenize_sql(sql)?; - let backtrace = Backtrace::new(); - let exprs = parse_comma_separated_exprs( - &tokens[1..tokens.len() as usize], - sql_dialect, - &backtrace, - )?; - - let values = exprs_to_datavalue( - exprs, - &self.schema, - self.ctx.clone(), - self.name_resolution_ctx, - bind_context, - metadata, - ) - .await?; - - reader.pop_checkpoint(); - - for (append_idx, deser) in desers.iter_mut().enumerate().take(col_size) { - deser.append_data_value(values[append_idx].clone(), &format)?; - } - - return Ok(()); - } - } - - reader.pop_checkpoint(); - Ok(()) - } -} - -// Values |(xxx), (yyy), (zzz) -pub fn skip_to_next_row( - reader: &mut NestedCheckpointReader, - mut balance: i32, -) -> Result<()> { - let _ = reader.ignore_white_spaces()?; - - let mut quoted = false; - let mut escaped = false; - - while balance > 0 { - let buffer = reader.fill_buf()?; - if buffer.is_empty() { - break; - } - - let size = buffer.len(); - - let it = buffer - .iter() - .position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\''); - - if let Some(it) = it { - let c = buffer[it]; - reader.consume(it + 1); - - if it == 0 && escaped { - escaped = false; - continue; - } - escaped = false; - - match c { - b'\\' => { - escaped = true; - continue; - } - b'\'' => { - quoted ^= true; - continue; - } - b')' => { - if !quoted { - balance -= 1; - } - } - b'(' => { - if !quoted { - balance += 1; - } - } - _ => {} - } - } else { - escaped = false; - reader.consume(size); - } - } - Ok(()) -} - -fn fill_default_value(expressions: &mut Vec<(EvalNode, String)>, field: &DataField) -> Result<()> { - if let Some(default_expr) = field.default_expr() { - expressions.push(( - Evaluator::eval_physical_scalar(&serde_json::from_str(default_expr)?)?, - field.name().to_string(), - )); - } else { - // If field data type is nullable, then we'll fill it with null. - if field.data_type().is_nullable() { - let scalar = Scalar::ConstantExpr(ConstantExpr { - value: DataValue::Null, - data_type: Box::new(field.data_type().clone()), - }); - expressions.push((Evaluator::eval_scalar(&scalar)?, field.name().to_string())); - } else { - expressions.push(( - Evaluator::eval_scalar(&Scalar::ConstantExpr(ConstantExpr { - value: field.data_type().default_value(), - data_type: Box::new(field.data_type().clone()), - }))?, - field.name().to_string(), - )); - } - } - Ok(()) -} - -async fn exprs_to_datavalue<'a>( - exprs: Vec>, - schema: &DataSchemaRef, - ctx: Arc, - name_resolution_ctx: &NameResolutionContext, - bind_context: &BindContext, - metadata: MetadataRef, -) -> Result> { - let schema_fields_len = schema.fields().len(); - if exprs.len() > schema_fields_len { - return Err(ErrorCode::LogicalError( - "Column count shouldn't be more than the number of schema", - )); - } - if exprs.len() < schema_fields_len { - return Err(ErrorCode::LogicalError( - "Column count doesn't match value count", - )); - } - let mut expressions = Vec::with_capacity(schema_fields_len); - for (i, expr) in exprs.iter().enumerate() { - // `DEFAULT` in insert values will be parsed as `Expr::ColumnRef`. - if let Expr::ColumnRef { column, .. } = expr { - if column.name.eq_ignore_ascii_case("default") { - let field = schema.field(i); - fill_default_value(&mut expressions, field)?; - continue; - } - } - let mut scalar_binder = ScalarBinder::new( - bind_context, - ctx.clone(), - name_resolution_ctx, - metadata.clone(), - &[], - ); - let (mut scalar, data_type) = scalar_binder.bind(expr).await?; - let field_data_type = schema.field(i).data_type(); - if data_type.ne(field_data_type) { - scalar = Scalar::CastExpr(CastExpr { - argument: Box::new(scalar), - from_type: Box::new(data_type), - target_type: Box::new(field_data_type.clone()), - }) - } - expressions.push(( - Evaluator::eval_scalar(&scalar)?, - schema.field(i).name().to_string(), - )); - } - - let dummy = DataSchemaRefExt::create(vec![DataField::new("dummy", u8::to_data_type())]); - let one_row_block = DataBlock::create(dummy, vec![Series::from_data(vec![1u8])]); - let func_ctx = ctx.try_get_function_context()?; - let mut expression_transform = ExpressionTransformV2 { - expressions, - func_ctx, - }; - let res = expression_transform.transform(one_row_block)?; - let datavalues: Vec = res.columns().iter().skip(1).map(|col| col.get(0)).collect(); - Ok(datavalues) } diff --git a/src/query/service/src/sql/planner/plans/insert.rs b/src/query/service/src/sql/planner/plans/insert.rs index 3b2dc8e465570..bee479f3b2c1d 100644 --- a/src/query/service/src/sql/planner/plans/insert.rs +++ b/src/query/service/src/sql/planner/plans/insert.rs @@ -22,8 +22,10 @@ use super::Plan; pub enum InsertInputSource { #[serde(skip)] SelectPlan(Box), + // From outside streaming source StreamingWithFormat(String), - Values(InsertValueBlock), + // From cloned String and format + StrWithFormat((String, String)), } #[derive(serde::Serialize, serde::Deserialize, Clone)] @@ -65,7 +67,7 @@ impl Insert { match &self.source { InsertInputSource::SelectPlan(_) => None, InsertInputSource::StreamingWithFormat(v) => Some(v.as_str()), - InsertInputSource::Values(_) => Some("values"), + InsertInputSource::StrWithFormat((_, v)) => Some(v.as_str()), } } } diff --git a/src/query/service/tests/it/servers/http/clickhouse_handler.rs b/src/query/service/tests/it/servers/http/clickhouse_handler.rs index 3627da6f3c8d8..2aa2585316c46 100644 --- a/src/query/service/tests/it/servers/http/clickhouse_handler.rs +++ b/src/query/service/tests/it/servers/http/clickhouse_handler.rs @@ -270,7 +270,7 @@ async fn test_insert_format_ndjson() -> PoemResult<()> { let (status, body) = server .post("insert into table t1 format JSONEachRow", &body) .await; - assert_eq!(status, StatusCode::BAD_REQUEST); + assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR); assert_error!(body, "column a"); } Ok(()) diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.result b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.result index 9309fc4e7e453..23ddb3a453145 100644 --- a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.result +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.result @@ -2,7 +2,7 @@ Code: 4000, displayText = invalid data (Expected to have char '.) Error occurs at row: 1: Column: 0, Name: a, Type: Timestamp, Parsed text: "19892-02-0" ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss format. -. + (while in processor thread 0). Code: 1046, displayText = Parse csv error at line 0 Error occurs at row: 0: @@ -10,7 +10,7 @@ Error occurs at row: 0: Column: 1, Name: b, Type: String, Parsed text: "Hello" Column: 2, Name: c, Type: Int32, Parsed text: "12345678" Error: There is no line feed. "," found instead. -. + (while in processor thread 0). Code: 1046, displayText = Parse csv error at line 0 Error occurs at row: 0: @@ -18,13 +18,13 @@ Error occurs at row: 0: Column: 1, Name: b, Type: String, Parsed text: "" Column: 2, Name: c, Type: Int32, Parsed text: "123" Error: There is no line feed. "H" found instead. -. + (while in processor thread 0). Code: 1046, displayText = Cannot parse value:'2023-04-0 to Date type, cause: input contains invalid characters Error occurs at row: 0: Column: 0, Name: a, Type: Timestamp, Parsed text: "2023-04-0" ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss format. -. + (while in processor thread 0). Code: 1046, displayText = Parse Tsv error at line 1 Error occurs at row: 1: @@ -32,7 +32,7 @@ Error occurs at row: 1: Column: 1, Name: b, Type: String, Parsed text: "World" Column: 2, Name: c, Type: Int32, Parsed text: "123456" Error: There is no line feed. "1" found instead. -. + (while in processor thread 0). Code: 1046, displayText = Parse Tsv error at line 0 Error occurs at row: 0: @@ -40,4 +40,4 @@ Error occurs at row: 0: Column: 1, Name: b, Type: String, Parsed text: "" Column: 2, Name: c, Type: Int32, Parsed text: "123" Error: There is no line feed. "H" found instead. -. \ No newline at end of file + (while in processor thread 0). \ No newline at end of file