diff --git a/Cargo.lock b/Cargo.lock index cb1dc7d4574ea..d843b05921d13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1681,21 +1681,31 @@ dependencies = [ name = "common-pipeline-sources" version = "0.1.0" dependencies = [ + "async-channel", "async-trait-fn", + "bstr", + "common-arrow", "common-base", "common-catalog", "common-datablocks", + "common-datavalues", "common-exception", "common-formats", "common-io", "common-meta-types", "common-pipeline-core", + "common-settings", "common-storage", "common-streams", + "crossbeam-channel", + "csv-core", "futures", "futures-util", "opendal", "parking_lot 0.12.1", + "serde_json", + "similar-asserts", + "tracing", ] [[package]] diff --git a/src/query/formats/src/lib.rs b/src/query/formats/src/lib.rs index c5157e1aad9c9..f61f714d26c9f 100644 --- a/src/query/formats/src/lib.rs +++ b/src/query/formats/src/lib.rs @@ -27,4 +27,5 @@ mod output_format_values; pub use format::InputFormat; pub use format::InputState; +pub use format_diagnostic::verbose_string; pub use format_factory::FormatFactory; diff --git a/src/query/pipeline/sources/Cargo.toml b/src/query/pipeline/sources/Cargo.toml index 1adbe3d98573b..b2130343ff2f7 100644 --- a/src/query/pipeline/sources/Cargo.toml +++ b/src/query/pipeline/sources/Cargo.toml @@ -9,19 +9,29 @@ doctest = false test = false [dependencies] +async-channel = "1.7.1" +common-arrow = { path = "../../../common/arrow" } common-base = { path = "../../../common/base" } common-catalog = { path = "../../catalog" } common-datablocks = { path = "../../datablocks" } +common-datavalues = { path = "../../datavalues" } common-exception = { path = "../../../common/exception" } common-formats = { path = "../../formats" } common-io = { path = "../../../common/io" } common-meta-types = { path = "../../../meta/types" } common-pipeline-core = { path = "../core" } +common-settings = { path = "../../settings" } common-storage = { path = "../../../common/storage" } common-streams = { path = "../../streams" } async-trait = { version = "0.1.0", package = "async-trait-fn" } +bstr = "0.2.17" +crossbeam-channel = "0.5.6" +csv-core = "0.1.10" futures = "0.3.21" futures-util = "0.3.21" opendal = { version = "0.17.1", features = ["layers-retry", "compress"] } parking_lot = "0.12.1" +serde_json = "1.0.81" +similar-asserts = "1.2.0" +tracing = "0.1.35" diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs new file mode 100644 index 0000000000000..33ac7ecc15e9f --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs @@ -0,0 +1,57 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::ErrorCode; +use common_exception::Result; + +pub enum RecordDelimiter { + Crlf, + Any(u8), +} + +impl RecordDelimiter { + pub fn end(&self) -> u8 { + match self { + RecordDelimiter::Crlf => b'\n', + RecordDelimiter::Any(b) => *b, + } + } +} + +impl TryFrom<&str> for RecordDelimiter { + type Error = ErrorCode; + fn try_from(s: &str) -> Result { + Self::try_from(s.as_bytes()) + } +} + +impl TryFrom<&[u8]> for RecordDelimiter { + type Error = ErrorCode; + fn try_from(s: &[u8]) -> Result { + match s.len() { + 1 => Ok(RecordDelimiter::Any(s[0])), + 2 if s.eq(b"\r\n") => Ok(RecordDelimiter::Crlf), + _ => Err(ErrorCode::InvalidArgument(format!( + "bad RecordDelimiter: '{:?}'", + s + ))), + } + } +} + +impl Default for RecordDelimiter { + fn default() -> Self { + RecordDelimiter::Crlf + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs new file mode 100644 index 0000000000000..ad2f256f5e747 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -0,0 +1,316 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem; +use std::sync::Arc; + +use common_datavalues::TypeDeserializer; +use common_exception::ErrorCode; +use common_exception::Result; +use common_formats::verbose_string; +use common_io::prelude::FormatSettings; +use common_io::prelude::NestedCheckpointReader; +use common_meta_types::StageFileFormatType; +use csv_core::ReadRecordResult; + +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_format_text::AligningState; +use crate::processors::sources::input_formats::input_format_text::BlockBuilder; +use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase; +use crate::processors::sources::input_formats::input_format_text::RowBatch; +use crate::processors::sources::input_formats::InputContext; + +pub struct InputFormatCSV {} + +impl InputFormatCSV { + fn read_row( + buf: &[u8], + deserializers: &mut [common_datavalues::TypeDeserializerImpl], + field_ends: &[usize], + format_settings: &FormatSettings, + path: &str, + row_index: usize, + ) -> Result<()> { + let mut field_start = 0; + for (c, deserializer) in deserializers.iter_mut().enumerate() { + let field_end = field_ends[c]; + let col_data = &buf[field_start..field_end]; + if col_data.is_empty() { + deserializer.de_default(format_settings); + } else { + let mut reader = NestedCheckpointReader::new(col_data); + // reader.ignores(|c: u8| c == b' ').expect("must success"); + // todo(youngsofun): do not need escape, already done in csv-core + if let Err(e) = deserializer.de_text(&mut reader, format_settings) { + let mut value = String::new(); + verbose_string(buf, &mut value); + let err_msg = format!( + "fail to decode column {}: {:?}, [column_data]=[{}]", + c, e, value + ); + return Err(csv_error(&err_msg, path, row_index)); + }; + } + field_start = field_end; + } + Ok(()) + } +} + +impl InputFormatTextBase for InputFormatCSV { + fn format_type() -> StageFileFormatType { + StageFileFormatType::Csv + } + + fn default_field_delimiter() -> u8 { + b',' + } + + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { + let columns = &mut builder.mutable_columns; + let n_column = columns.len(); + let mut start = 0usize; + let start_row = batch.start_row.expect("must success"); + let mut field_end_idx = 0; + for (i, end) in batch.row_ends.iter().enumerate() { + let buf = &batch.data[start..*end]; + Self::read_row( + buf, + columns, + &batch.field_ends[field_end_idx..field_end_idx + n_column], + &builder.ctx.format_settings, + &batch.path, + start_row + i, + )?; + start = *end; + field_end_idx += n_column; + } + Ok(()) + } + + fn align(state: &mut AligningState, buf_in: &[u8]) -> Result> { + let num_fields = state.num_fields; + let reader = state.csv_reader.as_mut().expect("must success"); + let field_ends = &mut reader.field_ends[..]; + let start_row = state.rows; + state.offset += buf_in.len(); + + // assume n_out <= n_in for read_record + let mut out_tmp = vec![0u8; buf_in.len()]; + let mut endlen = reader.n_end; + let mut buf = buf_in; + + while state.rows_to_skip > 0 { + let (result, n_in, _, n_end) = + reader + .reader + .read_record(buf, &mut out_tmp, &mut field_ends[endlen..]); + buf = &buf[n_in..]; + endlen += n_end; + + match result { + ReadRecordResult::InputEmpty => { + reader.n_end = endlen; + return Ok(vec![]); + } + ReadRecordResult::OutputFull => { + return Err(csv_error( + "output more than input, in header", + &state.path, + state.rows, + )); + } + ReadRecordResult::OutputEndsFull => { + return Err(csv_error( + &format!( + "too many fields, expect {}, got more than {}", + num_fields, + field_ends.len() + ), + &state.path, + state.rows, + )); + } + ReadRecordResult::Record => { + if endlen < num_fields { + return Err(csv_error( + &format!("expect {} fields, only found {} ", num_fields, n_end), + &state.path, + state.rows, + )); + } else if endlen > num_fields + 1 { + return Err(csv_error( + &format!("too many fields, expect {}, got {}", num_fields, n_end), + &state.path, + state.rows, + )); + } + + state.rows_to_skip -= 1; + tracing::debug!( + "csv aligner: skip a header row, remain {}", + state.rows_to_skip + ); + state.rows += 1; + endlen = 0; + } + ReadRecordResult::End => { + return Err(csv_error("unexpect eof in header", &state.path, state.rows)); + } + } + } + + let mut out_pos = 0usize; + let mut row_batch_end: usize = 0; + + let last_batch_remain_len = reader.out.len(); + + let mut row_batch = RowBatch { + data: vec![], + row_ends: vec![], + field_ends: vec![], + path: state.path.to_string(), + batch_id: state.batch_id, + offset: 0, + start_row: Some(state.rows), + }; + + while !buf.is_empty() { + let (result, n_in, n_out, n_end) = + reader + .reader + .read_record(buf, &mut out_tmp[out_pos..], &mut field_ends[endlen..]); + buf = &buf[n_in..]; + endlen += n_end; + out_pos += n_out; + match result { + ReadRecordResult::InputEmpty => break, + ReadRecordResult::OutputFull => { + return Err(csv_error( + "output more than input", + &state.path, + start_row + row_batch.row_ends.len(), + )); + } + ReadRecordResult::OutputEndsFull => { + return Err(csv_error( + &format!( + "too many fields, expect {}, got more than {}", + num_fields, + field_ends.len() + ), + &state.path, + start_row + row_batch.row_ends.len(), + )); + } + ReadRecordResult::Record => { + if endlen < num_fields { + return Err(csv_error( + &format!("expect {} fields, only found {} ", num_fields, n_end), + &state.path, + start_row + row_batch.row_ends.len(), + )); + } else if endlen > num_fields + 1 { + return Err(csv_error( + &format!("too many fields, expect {}, got {}", num_fields, n_end), + &state.path, + start_row + row_batch.row_ends.len(), + )); + } + row_batch + .field_ends + .extend_from_slice(&field_ends[..num_fields]); + row_batch.row_ends.push(last_batch_remain_len + out_pos); + endlen = 0; + row_batch_end = out_pos; + } + ReadRecordResult::End => { + return Err(csv_error( + "unexpect eof", + &state.path, + start_row + row_batch.row_ends.len(), + )); + } + } + } + + reader.n_end = endlen; + out_tmp.truncate(out_pos); + if row_batch.row_ends.is_empty() { + tracing::debug!( + "csv aligner: {} + {} bytes => 0 rows", + reader.out.len(), + buf_in.len(), + ); + reader.out.extend_from_slice(&out_tmp); + Ok(vec![]) + } else { + let last_remain = mem::take(&mut reader.out); + + state.batch_id += 1; + state.rows += row_batch.row_ends.len(); + reader.out.extend_from_slice(&out_tmp[row_batch_end..]); + + tracing::debug!( + "csv aligner: {} + {} bytes => {} rows + {} bytes remain", + last_remain.len(), + buf_in.len(), + row_batch.row_ends.len(), + reader.out.len() + ); + + out_tmp.truncate(row_batch_end); + row_batch.data = if last_remain.is_empty() { + out_tmp + } else { + vec![last_remain, out_tmp].concat() + }; + Ok(vec![row_batch]) + } + } +} + +pub struct CsvReaderState { + pub reader: csv_core::Reader, + + // remain from last read batch + pub out: Vec, + pub field_ends: Vec, + pub n_end: usize, +} + +impl CsvReaderState { + pub(crate) fn create(ctx: &Arc) -> Self { + let reader = csv_core::ReaderBuilder::new() + .delimiter(ctx.field_delimiter) + .terminator(match ctx.record_delimiter { + RecordDelimiter::Crlf => csv_core::Terminator::CRLF, + RecordDelimiter::Any(v) => csv_core::Terminator::Any(v), + }) + .build(); + Self { + reader, + out: vec![], + field_ends: vec![0; ctx.schema.num_fields() + 6], + n_end: 0, + } + } +} + +fn csv_error(msg: &str, path: &str, row: usize) -> ErrorCode { + let row = row + 1; + let msg = format!("fail to parse CSV {}:{} {} ", path, row, msg); + + ErrorCode::BadBytes(msg) +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs new file mode 100644 index 0000000000000..5f04e297ce8ea --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs @@ -0,0 +1,129 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; + +use bstr::ByteSlice; +use common_datavalues::DataSchemaRef; +use common_datavalues::TypeDeserializer; +use common_datavalues::TypeDeserializerImpl; +use common_exception::ErrorCode; +use common_exception::Result; +use common_io::prelude::FormatSettings; +use common_meta_types::StageFileFormatType; + +use crate::processors::sources::input_formats::input_format_text::AligningState; +use crate::processors::sources::input_formats::input_format_text::BlockBuilder; +use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase; +use crate::processors::sources::input_formats::input_format_text::RowBatch; + +pub struct InputFormatNDJson {} + +impl InputFormatNDJson { + fn read_row( + buf: &[u8], + deserializers: &mut [TypeDeserializerImpl], + format_settings: &FormatSettings, + schema: &DataSchemaRef, + ) -> Result<()> { + let mut json: serde_json::Value = serde_json::from_reader(buf)?; + // if it's not case_sensitive, we convert to lowercase + if !format_settings.ident_case_sensitive { + if let serde_json::Value::Object(x) = json { + let y = x.into_iter().map(|(k, v)| (k.to_lowercase(), v)).collect(); + json = serde_json::Value::Object(y); + } + } + + for (f, deser) in schema.fields().iter().zip(deserializers.iter_mut()) { + let value = if format_settings.ident_case_sensitive { + &json[f.name().to_owned()] + } else { + &json[f.name().to_lowercase()] + }; + + deser.de_json(value, format_settings).map_err(|e| { + let value_str = format!("{:?}", value); + ErrorCode::BadBytes(format!( + "{}. column={} value={}", + e, + f.name(), + maybe_truncated(&value_str, 1024), + )) + })?; + } + Ok(()) + } +} + +impl InputFormatTextBase for InputFormatNDJson { + fn format_type() -> StageFileFormatType { + StageFileFormatType::NdJson + } + + fn default_field_delimiter() -> u8 { + b',' + } + + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { + let columns = &mut builder.mutable_columns; + let mut start = 0usize; + let start_row = batch.start_row; + for (i, end) in batch.row_ends.iter().enumerate() { + let buf = &batch.data[start..*end]; + let buf = buf.trim(); + if !buf.is_empty() { + if let Err(e) = Self::read_row( + buf, + columns, + &builder.ctx.format_settings, + &builder.ctx.schema, + ) { + let row_info = if let Some(r) = start_row { + format!("row={},", r + i) + } else { + String::new() + }; + let msg = format!( + "fail to parse NDJSON: {}, path={}, offset={}, {}", + &batch.path, + e, + batch.offset + start, + row_info, + ); + return Err(ErrorCode::BadBytes(msg)); + } + } + start = *end; + } + Ok(()) + } + + fn align(state: &mut AligningState, buf: &[u8]) -> Result> { + Ok(state.align_by_record_delimiter(buf)) + } +} + +fn maybe_truncated(s: &str, limit: usize) -> Cow<'_, str> { + if s.len() > limit { + Cow::Owned(format!( + "(first {}B of {}B): {}", + limit, + s.len(), + &s[..limit] + )) + } else { + Cow::Borrowed(s) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs new file mode 100644 index 0000000000000..a3354ba70190e --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -0,0 +1,315 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::io::Cursor; +use std::io::Read; +use std::io::Seek; +use std::mem; +use std::sync::Arc; + +use common_arrow::arrow::array::Array; +use common_arrow::arrow::chunk::Chunk; +use common_arrow::arrow::datatypes::Field; +use common_arrow::arrow::io::parquet::read; +use common_arrow::arrow::io::parquet::read::read_columns; +use common_arrow::arrow::io::parquet::read::to_deserializer; +use common_arrow::arrow::io::parquet::read::RowGroupDeserializer; +use common_arrow::parquet::metadata::ColumnChunkMetaData; +use common_arrow::parquet::metadata::FileMetaData; +use common_arrow::parquet::metadata::RowGroupMetaData; +use common_arrow::parquet::read::read_metadata; +use common_base::base::tokio::sync::mpsc::Receiver; +use common_datablocks::DataBlock; +use common_datavalues::remove_nullable; +use common_datavalues::DataField; +use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; +use common_exception::Result; +use common_pipeline_core::Pipeline; +use opendal::Object; +use similar_asserts::traits::MakeDiff; + +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::InputData; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; +use crate::processors::sources::input_formats::InputFormat; + +pub struct InputFormatParquet; + +#[async_trait::async_trait] +impl InputFormat for InputFormatParquet { + fn default_record_delimiter(&self) -> RecordDelimiter { + RecordDelimiter::Crlf + } + + fn default_field_delimiter(&self) -> u8 { + b'_' + } + + async fn read_file_meta( + &self, + _obj: &Object, + _size: usize, + ) -> Result>> { + // todo(youngsofun): execute_copy_aligned + Ok(None) + } + + async fn read_split_meta( + &self, + _obj: &Object, + _split_info: &SplitInfo, + ) -> Result>> { + Ok(None) + } + + fn split_files(&self, file_infos: Vec, _split_size: usize) -> Vec { + file_infos + .into_iter() + .map(SplitInfo::from_file_info) + .collect() + } + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + // todo(youngsofun): execute_copy_aligned + ParquetFormatPipe::execute_copy_with_aligner(ctx, pipeline) + } + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()> { + ParquetFormatPipe::execute_stream(ctx, pipeline, input) + } +} + +pub struct ParquetFormatPipe; + +#[async_trait::async_trait] +impl InputFormatPipe for ParquetFormatPipe { + type ReadBatch = ReadBatch; + type RowBatch = RowGroupInMemory; + type AligningState = AligningState; + type BlockBuilder = ParquetBlockBuilder; +} + +pub struct RowGroupInMemory { + pub meta: RowGroupMetaData, + pub fields: Arc>, + pub field_meta_indexes: Vec>, + pub field_arrays: Vec>>, +} + +impl RowGroupInMemory { + fn read( + reader: &mut R, + meta: RowGroupMetaData, + fields: Arc>, + ) -> Result { + let field_names = fields.iter().map(|x| x.name.as_str()).collect::>(); + let field_meta_indexes = split_column_metas_by_field(meta.columns(), &field_names); + let mut filed_arrays = vec![]; + for field_name in field_names { + let meta_data = read_columns(reader, meta.columns(), field_name)?; + let data = meta_data.into_iter().map(|t| t.1).collect::>(); + filed_arrays.push(data) + } + Ok(Self { + meta, + field_meta_indexes, + field_arrays: filed_arrays, + fields, + }) + } + + fn get_arrow_chunk(&mut self) -> Result>> { + let mut column_chunks = vec![]; + let field_arrays = mem::take(&mut self.field_arrays); + for (f, datas) in field_arrays.into_iter().enumerate() { + let meta_iters = self.field_meta_indexes[f] + .iter() + .map(|c| &self.meta.columns()[*c]); + let meta_data = meta_iters.zip(datas.into_iter()).collect::>(); + let array_iters = to_deserializer( + meta_data, + self.fields[f].clone(), + self.meta.num_rows() as usize, + None, + None, + )?; + column_chunks.push(array_iters); + } + match RowGroupDeserializer::new(column_chunks, self.meta.num_rows(), None).next() { + None => Err(ErrorCode::ParquetError("fail to get a chunk")), + Some(Ok(chunk)) => Ok(chunk), + Some(Err(e)) => Err(ErrorCode::ParquetError(e.to_string())), + } + } +} + +impl Debug for RowGroupInMemory { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "RowGroupInMemory") + } +} + +#[derive(Debug)] +pub enum ReadBatch { + Buffer(Vec), + #[allow(unused)] + RowGroup(RowGroupInMemory), +} + +impl From> for ReadBatch { + fn from(v: Vec) -> Self { + Self::Buffer(v) + } +} + +pub struct ParquetBlockBuilder { + ctx: Arc, +} + +impl BlockBuilderTrait for ParquetBlockBuilder { + type Pipe = ParquetFormatPipe; + + fn create(ctx: Arc) -> Self { + ParquetBlockBuilder { ctx } + } + + fn deserialize(&mut self, mut batch: Option) -> Result> { + if let Some(rg) = batch.as_mut() { + let chunk = rg.get_arrow_chunk()?; + let block = DataBlock::from_chunk(&self.ctx.schema, &chunk)?; + Ok(vec![block]) + } else { + Ok(vec![]) + } + } +} + +pub struct AligningState { + ctx: Arc, + split_info: SplitInfo, + buffers: Vec>, +} + +impl AligningStateTrait for AligningState { + type Pipe = ParquetFormatPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { + Ok(AligningState { + ctx: ctx.clone(), + split_info: split_info.clone(), + buffers: vec![], + }) + } + + fn align(&mut self, read_batch: Option) -> Result> { + if let Some(rb) = read_batch { + if let ReadBatch::Buffer(b) = rb { + self.buffers.push(b) + }; + Ok(vec![]) + } else { + let file_in_memory = self.buffers.concat(); + let size = file_in_memory.len(); + tracing::debug!( + "aligning parquet file {} of {} bytes", + self.split_info.file_info.path, + size, + ); + let mut cursor = Cursor::new(file_in_memory); + let file_meta = + read_metadata(&mut cursor).map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let read_fields = Arc::new(get_fields(&file_meta, &self.ctx.schema)?); + + let mut row_batches = Vec::with_capacity(file_meta.row_groups.len()); + for row_group in file_meta.row_groups.into_iter() { + row_batches.push(RowGroupInMemory::read( + &mut cursor, + row_group, + read_fields.clone(), + )?) + } + tracing::info!( + "align parquet file {} of {} bytes to {} row groups", + self.split_info.file_info.path, + size, + row_batches.len() + ); + Ok(row_batches) + } + } +} + +fn get_fields(file_meta: &FileMetaData, schema: &DataSchemaRef) -> Result> { + let infer_schema = read::infer_schema(file_meta)?; + let mut read_fields = Vec::with_capacity(schema.num_fields()); + for f in schema.fields().iter() { + if let Some(m) = infer_schema + .fields + .iter() + .filter(|c| c.name.eq_ignore_ascii_case(f.name())) + .last() + { + let tf = DataField::from(m); + if remove_nullable(tf.data_type()) != remove_nullable(f.data_type()) { + let pair = (f, m); + let diff = pair.make_diff("expected_field", "infer_field"); + return Err(ErrorCode::ParquetError(format!( + "parquet schema mismatch, differ: {}", + diff + ))); + } + + read_fields.push(m.clone()); + } else { + return Err(ErrorCode::ParquetError(format!( + "schema field size mismatch, expected to find column: {}", + f.name() + ))); + } + } + Ok(read_fields) +} + +pub fn split_column_metas_by_field( + columns: &[ColumnChunkMetaData], + field_names: &[&str], +) -> Vec> { + let mut r = field_names.iter().map(|_| vec![]).collect::>(); + let d = field_names + .iter() + .enumerate() + .map(|(i, name)| (name, i)) + .collect::>(); + columns.iter().enumerate().for_each(|(col_i, x)| { + if let Some(field_i) = d.get(&x.descriptor().path_in_schema[0].as_str()) { + r[*field_i].push(col_i); + } + }); + r +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs new file mode 100644 index 0000000000000..6a10421ab5048 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -0,0 +1,144 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datavalues::TypeDeserializer; +use common_exception::ErrorCode; +use common_exception::Result; +use common_formats::verbose_string; +use common_io::prelude::BufferReadExt; +use common_io::prelude::FormatSettings; +use common_io::prelude::NestedCheckpointReader; +use common_meta_types::StageFileFormatType; + +use crate::processors::sources::input_formats::input_format_text::AligningState; +use crate::processors::sources::input_formats::input_format_text::BlockBuilder; +use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase; +use crate::processors::sources::input_formats::input_format_text::RowBatch; + +pub struct InputFormatTSV {} + +impl InputFormatTSV { + fn read_row( + buf: &[u8], + deserializers: &mut Vec, + format_settings: &FormatSettings, + path: &str, + batch_id: usize, + offset: usize, + row_index: Option, + ) -> Result<()> { + let num_columns = deserializers.len(); + let mut column_index = 0; + let mut field_start = 0; + let mut pos = 0; + let mut err_msg = None; + let buf_len = buf.len(); + while pos <= buf_len { + if pos == buf_len || buf[pos] == b'\t' { + let col_data = &buf[field_start..pos]; + if col_data.is_empty() { + deserializers[column_index].de_default(format_settings); + } else { + let mut reader = NestedCheckpointReader::new(col_data); + reader.ignores(|c: u8| c == b' ').expect("must success"); + if let Err(e) = + deserializers[column_index].de_text(&mut reader, format_settings) + { + err_msg = Some(format!( + "fail to decode column {}: {:?}, [column_data]=[{}]", + column_index, e, "" + )); + break; + }; + // todo(youngsofun): check remaining data + } + column_index += 1; + field_start = pos + 1; + if column_index > num_columns { + err_msg = Some("too many columns".to_string()); + break; + } + } + pos += 1; + } + if column_index < num_columns - 1 { + // todo(youngsofun): allow it optionally (set default) + err_msg = Some(format!( + "need {} columns, find {} only", + num_columns, + column_index + 1 + )); + } + if let Some(m) = err_msg { + let row_info = if let Some(r) = row_index { + format!("at row {},", r) + } else { + String::new() + }; + let mut msg = format!( + "fail to parse tsv {} batch {} at offset {}, {} reason={}, row data: ", + path, + batch_id, + offset + pos, + row_info, + m + ); + verbose_string(buf, &mut msg); + Err(ErrorCode::BadBytes(msg)) + } else { + Ok(()) + } + } +} + +impl InputFormatTextBase for InputFormatTSV { + fn format_type() -> StageFileFormatType { + StageFileFormatType::Tsv + } + + fn default_field_delimiter() -> u8 { + b'\t' + } + + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { + tracing::debug!( + "tsv deserializing row batch {}, id={}, start_row={:?}, offset={}", + batch.path, + batch.batch_id, + batch.start_row, + batch.offset + ); + let columns = &mut builder.mutable_columns; + let mut start = 0usize; + let start_row = batch.start_row; + for (i, end) in batch.row_ends.iter().enumerate() { + let buf = &batch.data[start..*end]; // include \n + Self::read_row( + buf, + columns, + &builder.ctx.format_settings, + &batch.path, + batch.batch_id, + batch.offset + start, + start_row.map(|n| n + i), + )?; + start = *end; + } + Ok(()) + } + + fn align(state: &mut AligningState, buf: &[u8]) -> Result> { + Ok(state.align_by_record_delimiter(buf)) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs new file mode 100644 index 0000000000000..cd6abe156cb4d --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod input_format_csv; +pub mod input_format_ndjson; +pub mod input_format_parquet; +pub mod input_format_tsv; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs new file mode 100644 index 0000000000000..c3606a31e1811 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -0,0 +1,248 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; +use std::sync::Arc; + +use common_base::base::Progress; +use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; +use common_exception::Result; +use common_io::prelude::FormatSettings; +use common_meta_types::StageFileCompression; +use common_meta_types::StageFileFormatType; +use common_meta_types::UserStageInfo; +use common_settings::Settings; +use opendal::io_util::CompressAlgorithm; +use opendal::Operator; + +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::impls::input_format_csv::InputFormatCSV; +use crate::processors::sources::input_formats::impls::input_format_ndjson::InputFormatNDJson; +use crate::processors::sources::input_formats::impls::input_format_parquet::InputFormatParquet; +use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_format_text::InputFormatText; +use crate::processors::sources::input_formats::InputFormat; + +pub enum InputPlan { + CopyInto(Box), + StreamingLoad, +} + +pub struct CopyIntoPlan { + pub stage_info: UserStageInfo, + pub files: Vec, +} + +pub struct InputContext { + pub plan: InputPlan, + pub schema: DataSchemaRef, + pub operator: Operator, + pub format: Arc, + pub splits: Vec, + + // row format only + pub rows_to_skip: usize, + pub field_delimiter: u8, + pub record_delimiter: RecordDelimiter, + + // runtime config + pub settings: Arc, + pub format_settings: FormatSettings, + + pub read_batch_size: usize, + pub rows_per_block: usize, + + pub scan_progress: Arc, +} + +impl InputContext { + pub fn get_input_format(format: &StageFileFormatType) -> Result> { + match format { + StageFileFormatType::Tsv => Ok(Arc::new(InputFormatText::::create())), + StageFileFormatType::Csv => Ok(Arc::new(InputFormatText::::create())), + StageFileFormatType::NdJson => { + Ok(Arc::new(InputFormatText::::create())) + } + StageFileFormatType::Parquet => Ok(Arc::new(InputFormatParquet {})), + format => Err(ErrorCode::LogicalError(format!( + "Unsupported file format: {:?}", + format + ))), + } + } + + pub async fn try_create_from_copy( + operator: Operator, + settings: Arc, + format_settings: FormatSettings, + schema: DataSchemaRef, + stage_info: UserStageInfo, + files: Vec, + scan_progress: Arc, + ) -> Result { + if files.is_empty() { + return Err(ErrorCode::BadArguments("no file to copy")); + } + let plan = Box::new(CopyIntoPlan { stage_info, files }); + let read_batch_size = settings.get_input_read_buffer_size()? as usize; + let split_size = 128usize * 1024 * 1024; + let file_format_options = &plan.stage_info.file_format_options; + let format = Self::get_input_format(&file_format_options.format)?; + let file_infos = Self::get_file_infos(&format, &operator, &plan).await?; + let splits = format.split_files(file_infos, split_size); + let rows_per_block = settings.get_max_block_size()? as usize; + let record_delimiter = { + if file_format_options.record_delimiter.is_empty() { + format.default_record_delimiter() + } else { + RecordDelimiter::try_from(file_format_options.record_delimiter.as_str())? + } + }; + + let rows_to_skip = file_format_options.skip_header as usize; + let field_delimiter = { + if file_format_options.field_delimiter.is_empty() { + format.default_field_delimiter() + } else { + file_format_options.field_delimiter.as_bytes()[0] + } + }; + Ok(InputContext { + format, + schema, + operator, + splits, + settings, + format_settings, + record_delimiter, + rows_per_block, + read_batch_size, + plan: InputPlan::CopyInto(plan), + rows_to_skip, + field_delimiter, + scan_progress, + }) + } + + #[allow(unused)] + async fn try_create_from_insert( + format_name: &str, + operator: Operator, + settings: Arc, + format_settings: FormatSettings, + schema: DataSchemaRef, + scan_progress: Arc, + ) -> Result { + let format = + StageFileFormatType::from_str(format_name).map_err(ErrorCode::UnknownFormat)?; + let format = Self::get_input_format(&format)?; + let read_batch_size = settings.get_input_read_buffer_size()? as usize; + let rows_per_block = settings.get_max_block_size()? as usize; + let field_delimiter = settings.get_field_delimiter()?; + let field_delimiter = { + if field_delimiter.is_empty() { + format.default_field_delimiter() + } else { + field_delimiter.as_bytes()[0] + } + }; + let record_delimiter = RecordDelimiter::try_from(&settings.get_record_delimiter()?[..])?; + let rows_to_skip = settings.get_skip_header()? as usize; + Ok(InputContext { + format, + schema, + operator, + settings, + format_settings, + record_delimiter, + rows_per_block, + read_batch_size, + field_delimiter, + rows_to_skip, + scan_progress, + plan: InputPlan::StreamingLoad, + splits: Default::default(), + }) + } + + async fn get_file_infos( + format: &Arc, + op: &Operator, + plan: &CopyIntoPlan, + ) -> Result> { + let mut infos = vec![]; + for p in &plan.files { + let obj = op.object(p); + let size = obj.metadata().await?.content_length() as usize; + let file_meta = format.read_file_meta(&obj, size).await?; + let compress_alg = InputContext::get_compression_alg_copy( + plan.stage_info.file_format_options.compression, + p, + )?; + let info = FileInfo { + path: p.clone(), + size, + compress_alg, + file_meta, + }; + infos.push(info) + } + Ok(infos) + } + + pub fn num_prefetch_splits(&self) -> Result { + Ok(self.settings.get_max_threads()? as usize) + } + + pub fn num_prefetch_per_split(&self) -> usize { + 1 + } + + pub fn get_compression_alg(&self, path: &str) -> Result> { + let opt = match &self.plan { + InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression, + _ => StageFileCompression::None, + }; + Self::get_compression_alg_copy(opt, path) + } + + pub fn get_compression_alg_copy( + compress_option: StageFileCompression, + path: &str, + ) -> Result> { + let compression_algo = match compress_option { + StageFileCompression::Auto => CompressAlgorithm::from_path(path), + StageFileCompression::Gzip => Some(CompressAlgorithm::Gzip), + StageFileCompression::Bz2 => Some(CompressAlgorithm::Bz2), + StageFileCompression::Brotli => Some(CompressAlgorithm::Brotli), + StageFileCompression::Zstd => Some(CompressAlgorithm::Zstd), + StageFileCompression::Deflate => Some(CompressAlgorithm::Zlib), + StageFileCompression::RawDeflate => Some(CompressAlgorithm::Deflate), + StageFileCompression::Xz => Some(CompressAlgorithm::Xz), + StageFileCompression::Lzo => { + return Err(ErrorCode::UnImplement("compress type lzo is unimplemented")); + } + StageFileCompression::Snappy => { + return Err(ErrorCode::UnImplement( + "compress type snappy is unimplemented", + )); + } + StageFileCompression::None => None, + }; + Ok(compression_algo) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs new file mode 100644 index 0000000000000..8dda2c8f1e7fb --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs @@ -0,0 +1,148 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::cmp::min; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_exception::Result; +use common_pipeline_core::Pipeline; +use opendal::io_util::CompressAlgorithm; +use opendal::Object; + +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; + +pub trait InputData: Send + Sync + 'static { + fn as_any(&self) -> &dyn Any; +} + +pub trait InputState: Send { + fn as_any(&mut self) -> &mut dyn Any; +} + +#[async_trait::async_trait] +pub trait InputFormat: Send + Sync { + fn default_record_delimiter(&self) -> RecordDelimiter; + + fn default_field_delimiter(&self) -> u8; + + async fn read_file_meta(&self, obj: &Object, size: usize) + -> Result>>; + + async fn read_split_meta( + &self, + obj: &Object, + split_info: &SplitInfo, + ) -> Result>>; + + fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec; + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()>; + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()>; +} + +#[derive(Clone)] +pub struct FileInfo { + pub path: String, + pub size: usize, + pub compress_alg: Option, + pub file_meta: Option>, +} + +impl FileInfo { + pub fn split_by_size(&self, split_size: usize) -> Vec { + let mut splits = vec![]; + let n = (self.size + split_size - 1) / split_size; + for i in 0..n - 1 { + splits.push(SplitInfo { + file_info: self.clone(), + seq_infile: i, + is_end: i == n - 1, + offset: i * split_size, + len: min((i + 1) * split_size, self.size), + split_meta: None, + }) + } + splits + } +} + +impl Debug for FileInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileInfo") + .field("path", &self.path) + .field("size", &self.size) + .finish() + } +} + +#[derive(Clone)] +pub struct SplitInfo { + pub file_info: FileInfo, + pub seq_infile: usize, + pub is_end: bool, + pub offset: usize, + pub len: usize, + pub split_meta: Option>, +} + +impl SplitInfo { + pub fn from_file_info(file_info: FileInfo) -> Self { + let len = file_info.size; + Self { + file_info, + seq_infile: 0, + is_end: true, + offset: 0, + len, + split_meta: None, + } + } + + pub fn from_stream_split(path: String) -> Self { + SplitInfo { + file_info: FileInfo { + path, + size: 0, + compress_alg: None, + file_meta: None, + }, + seq_infile: 0, + offset: 0, + len: 0, + is_end: false, + split_meta: None, + } + } +} + +impl Debug for SplitInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SplitInfo") + .field("file_info", &self.file_info) + .field("seq_infile", &self.seq_infile) + .finish() + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs new file mode 100644 index 0000000000000..0363dd6231d8d --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -0,0 +1,390 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; +use std::mem; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_datablocks::DataBlock; +use common_datavalues::TypeDeserializer; +use common_datavalues::TypeDeserializerImpl; +use common_exception::ErrorCode; +use common_exception::Result; +use common_meta_types::StageFileFormatType; +use common_pipeline_core::Pipeline; +use opendal::io_util::DecompressDecoder; +use opendal::io_util::DecompressState; +use opendal::Object; + +use super::InputFormat; +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::impls::input_format_csv::CsvReaderState; +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::InputData; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; + +pub trait InputFormatTextBase: Sized + Send + Sync + 'static { + fn format_type() -> StageFileFormatType; + + fn is_splittable() -> bool { + false + } + + fn default_record_delimiter() -> RecordDelimiter { + RecordDelimiter::Crlf + } + + fn default_field_delimiter() -> u8; + + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()>; + + fn align(state: &mut AligningState, buf: &[u8]) -> Result>; +} + +pub struct InputFormatText { + phantom: PhantomData, +} + +impl InputFormatText { + pub fn create() -> Self { + Self { + phantom: Default::default(), + } + } +} + +pub struct InputFormatTextPipe { + phantom: PhantomData, +} + +#[async_trait::async_trait] +impl InputFormatPipe for InputFormatTextPipe { + type ReadBatch = Vec; + type RowBatch = RowBatch; + type AligningState = AligningState; + type BlockBuilder = BlockBuilder; +} + +#[async_trait::async_trait] +impl InputFormat for InputFormatText { + fn default_record_delimiter(&self) -> RecordDelimiter { + T::default_record_delimiter() + } + + fn default_field_delimiter(&self) -> u8 { + T::default_field_delimiter() + } + + async fn read_file_meta( + &self, + _obj: &Object, + _size: usize, + ) -> Result>> { + Ok(None) + } + + async fn read_split_meta( + &self, + _obj: &Object, + _split_info: &SplitInfo, + ) -> Result>> { + Ok(None) + } + + fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec { + let mut splits = vec![]; + for f in file_infos { + if f.compress_alg.is_none() || !T::is_splittable() { + splits.push(SplitInfo::from_file_info(f)) + } else { + splits.append(&mut f.split_by_size(split_size)) + } + } + splits + } + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + tracing::info!("exe text"); + InputFormatTextPipe::::execute_copy_with_aligner(ctx, pipeline) + } + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()> { + InputFormatTextPipe::::execute_stream(ctx, pipeline, input) + } +} + +#[derive(Default)] +pub struct RowBatch { + pub data: Vec, + pub row_ends: Vec, + pub field_ends: Vec, + + // for error info + pub path: String, + pub batch_id: usize, + pub offset: usize, + pub start_row: Option, +} + +pub struct AligningState { + pub path: String, + pub record_delimiter_end: u8, + pub field_delimiter: u8, + pub batch_id: usize, + pub rows: usize, + pub offset: usize, + pub rows_to_skip: usize, + pub tail_of_last_batch: Vec, + pub num_fields: usize, + pub decoder: Option, + pub csv_reader: Option, + phantom: PhantomData, +} + +impl AligningState { + pub fn align_by_record_delimiter(&mut self, buf_in: &[u8]) -> Vec { + let record_delimiter_end = self.record_delimiter_end; + let size_last_remain = self.tail_of_last_batch.len(); + let mut buf = buf_in; + if self.rows_to_skip > 0 { + let mut i = 0; + for b in buf.iter() { + if *b == record_delimiter_end { + self.rows_to_skip -= 1; + if self.rows_to_skip == 0 { + break; + } + } + i += 1; + } + if self.rows_to_skip > 0 { + self.tail_of_last_batch = vec![]; + return vec![]; + } else { + buf = &buf[i + 1..]; + } + } + if buf.is_empty() { + return vec![]; + } + + let mut output = RowBatch::default(); + let rows = &mut output.row_ends; + for (i, b) in buf.iter().enumerate() { + if *b == b'\n' { + rows.push(i + 1 + size_last_remain) + } + } + if rows.is_empty() { + self.tail_of_last_batch.extend_from_slice(buf); + vec![] + } else { + let batch_end = rows[rows.len() - 1] - size_last_remain; + output.data = mem::take(&mut self.tail_of_last_batch); + output.data.extend_from_slice(&buf[..batch_end]); + self.tail_of_last_batch.extend_from_slice(&buf[batch_end..]); + let size = output.data.len(); + output.path = self.path.to_string(); + output.start_row = Some(self.rows); + output.offset = self.offset; + output.batch_id = self.batch_id; + self.offset += size; + self.rows += rows.len(); + self.batch_id += 1; + tracing::debug!( + "align batch {}, {} + {} + {} bytes to {} rows", + output.batch_id, + size_last_remain, + batch_end, + self.tail_of_last_batch.len(), + rows.len(), + ); + vec![output] + } + } + + fn flush(&mut self) -> Vec { + if self.tail_of_last_batch.is_empty() { + vec![] + } else { + // last row + let data = mem::take(&mut self.tail_of_last_batch); + let end = data.len(); + let row_batch = RowBatch { + data, + row_ends: vec![end], + field_ends: vec![], + path: self.path.to_string(), + batch_id: self.batch_id, + offset: self.offset, + start_row: Some(self.rows), + }; + tracing::debug!( + "align flush batch {}, bytes = {}, start_row = {}", + row_batch.batch_id, + self.tail_of_last_batch.len(), + self.rows + ); + vec![row_batch] + } + } +} + +impl AligningStateTrait for AligningState { + type Pipe = InputFormatTextPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { + let rows_to_skip = if split_info.seq_infile == 0 { + ctx.rows_to_skip + } else { + 0 + }; + let path = split_info.file_info.path.clone(); + + let decoder = ctx.get_compression_alg(&path)?.map(DecompressDecoder::new); + let csv_reader = if T::format_type() == StageFileFormatType::Csv { + Some(CsvReaderState::create(ctx)) + } else { + None + }; + + Ok(AligningState:: { + path, + decoder, + rows_to_skip, + csv_reader, + tail_of_last_batch: vec![], + rows: 0, + batch_id: 0, + num_fields: ctx.schema.num_fields(), + offset: split_info.offset, + record_delimiter_end: ctx.record_delimiter.end(), + field_delimiter: ctx.field_delimiter, + phantom: Default::default(), + }) + } + + fn align(&mut self, read_batch: Option>) -> Result> { + let row_batches = if let Some(data) = read_batch { + let buf = if let Some(decoder) = self.decoder.as_mut() { + decompress(decoder, &data)? + } else { + data + }; + T::align(self, &buf)? + } else { + if let Some(decoder) = &self.decoder { + assert_eq!(decoder.state(), DecompressState::Done) + } + self.flush() + }; + Ok(row_batches) + } +} + +pub struct BlockBuilder { + pub ctx: Arc, + pub mutable_columns: Vec, + pub num_rows: usize, + phantom: PhantomData, +} + +impl BlockBuilder { + fn flush(&mut self) -> Result> { + let mut columns = Vec::with_capacity(self.mutable_columns.len()); + for deserializer in &mut self.mutable_columns { + columns.push(deserializer.finish_to_column()); + } + self.mutable_columns = self + .ctx + .schema + .create_deserializers(self.ctx.rows_per_block); + self.num_rows = 0; + + Ok(vec![DataBlock::create(self.ctx.schema.clone(), columns)]) + } +} + +impl BlockBuilderTrait for BlockBuilder { + type Pipe = InputFormatTextPipe; + + fn create(ctx: Arc) -> Self { + let columns = ctx.schema.create_deserializers(ctx.rows_per_block); + BlockBuilder { + ctx, + mutable_columns: columns, + num_rows: 0, + phantom: Default::default(), + } + } + + fn deserialize(&mut self, batch: Option) -> Result> { + if let Some(b) = batch { + self.num_rows += b.row_ends.len(); + T::deserialize(self, b)?; + if self.num_rows >= self.ctx.rows_per_block { + self.flush() + } else { + Ok(vec![]) + } + } else { + self.flush() + } + } +} + +fn decompress(decoder: &mut DecompressDecoder, compressed: &[u8]) -> Result> { + let mut decompress_bufs = vec![]; + let mut amt = 0; + loop { + match decoder.state() { + DecompressState::Reading => { + if amt == compressed.len() { + break; + } + let read = decoder.fill(&compressed[amt..]); + amt += read; + } + DecompressState::Decoding => { + let mut decompress_buf = vec![0u8; 4096]; + let written = decoder.decode(&mut decompress_buf[..]).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + decompress_buf.truncate(written); + decompress_bufs.push(decompress_buf); + } + DecompressState::Flushing => { + let mut decompress_buf = vec![0u8; 4096]; + let written = decoder.finish(&mut decompress_buf).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + decompress_buf.truncate(written); + decompress_bufs.push(decompress_buf); + } + DecompressState::Done => break, + } + } + Ok(decompress_bufs.concat()) +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs new file mode 100644 index 0000000000000..3e691c792ad13 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -0,0 +1,271 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::sync::Arc; + +use common_base::base::tokio; +use common_base::base::tokio::sync::mpsc::Receiver; +use common_base::base::tokio::sync::mpsc::Sender; +use common_base::base::GlobalIORuntime; +use common_base::base::TrySpawn; +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::Pipeline; +use common_pipeline_core::SourcePipeBuilder; +use futures_util::stream::FuturesUnordered; +use futures_util::AsyncReadExt; +use futures_util::StreamExt; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_context::InputPlan; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::source_aligner::Aligner; +use crate::processors::sources::input_formats::source_deserializer::DeserializeSource; +use crate::processors::sources::input_formats::transform_deserializer::DeserializeTransformer; + +pub struct Split { + pub(crate) info: SplitInfo, + pub(crate) rx: Receiver>, +} + +#[allow(unused)] +pub struct StreamingSplit { + path: String, + data_tx: Sender, +} + +pub struct StreamingReadBatch { + data: Vec, + pub(crate) path: String, + pub(crate) is_start: bool, +} + +pub trait AligningStateTrait: Sized { + type Pipe: InputFormatPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result; + + fn align( + &mut self, + read_batch: Option<::ReadBatch>, + ) -> Result::RowBatch>>; +} + +pub trait BlockBuilderTrait { + type Pipe: InputFormatPipe; + fn create(ctx: Arc) -> Self; + + fn deserialize( + &mut self, + batch: Option<::RowBatch>, + ) -> Result>; +} + +#[async_trait::async_trait] +pub trait InputFormatPipe: Sized + Send + 'static { + type ReadBatch: From> + Send + Debug; + type RowBatch: Send; + type AligningState: AligningStateTrait + Send; + type BlockBuilder: BlockBuilderTrait + Send; + + fn execute_stream( + ctx: Arc, + pipeline: &mut Pipeline, + mut input: Receiver, + ) -> Result<()> { + let (split_tx, split_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?; + + tokio::spawn(async move { + let mut sender: Option>> = None; + while let Some(batch) = input.recv().await { + if batch.is_start { + let (data_tx, data_rx) = tokio::sync::mpsc::channel(1); + sender = Some(data_tx); + let split_info = SplitInfo::from_stream_split(batch.path.clone()); + split_tx + .send(Split { + info: split_info, + rx: data_rx, + }) + .await + .expect("fail to send split from stream"); + } + if let Some(s) = sender.as_mut() { + s.send(Ok(batch.data.into())) + .await + .expect("fail to send read batch from stream"); + } + } + }); + Ok(()) + } + + fn execute_copy_with_aligner(ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + let (split_tx, split_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?; + + let ctx_clone = ctx.clone(); + GlobalIORuntime::instance().spawn(async move { + tracing::debug!("start copy splits feeder"); + for s in &ctx_clone.splits { + let (data_tx, data_rx) = tokio::sync::mpsc::channel(ctx.num_prefetch_per_split()); + let split_clone = s.clone(); + let ctx_clone2 = ctx_clone.clone(); + tokio::spawn(async move { + if let Err(e) = + Self::copy_reader_with_aligner(ctx_clone2, split_clone, data_tx).await + { + tracing::error!("copy split reader error: {:?}", e); + } else { + tracing::debug!("copy split reader stopped"); + } + }); + if split_tx + .send(Split { + info: s.clone(), + rx: data_rx, + }) + .await + .is_err() + { + break; + }; + } + tracing::info!("end copy splits feeder"); + }); + + Ok(()) + } + + fn execute_copy_aligned(ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + let (data_tx, data_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_aligned(&ctx, data_rx, pipeline)?; + + let ctx_clone = ctx.clone(); + let p = 3; + tokio::spawn(async move { + let mut futs = FuturesUnordered::new(); + for s in &ctx_clone.splits { + let fut = Self::read_split(ctx_clone.clone(), s.clone()); + futs.push(fut); + if futs.len() >= p { + let row_batch = futs.next().await.unwrap().unwrap(); + data_tx.send(row_batch).await.unwrap(); + } + } + + while let Some(row_batch) = futs.next().await { + data_tx.send(row_batch.unwrap()).await.unwrap(); + } + }); + Ok(()) + } + + fn build_pipeline_aligned( + ctx: &Arc, + row_batch_rx: async_channel::Receiver, + pipeline: &mut Pipeline, + ) -> Result<()> { + let mut builder = SourcePipeBuilder::create(); + for _ in 0..ctx.settings.get_max_threads()? { + let output = OutputPort::create(); + let source = DeserializeSource::::create( + ctx.clone(), + output.clone(), + row_batch_rx.clone(), + )?; + builder.add_source(output, source); + } + pipeline.add_pipe(builder.finalize()); + Ok(()) + } + + fn build_pipeline_with_aligner( + ctx: &Arc, + split_rx: async_channel::Receiver>, + pipeline: &mut Pipeline, + ) -> Result<()> { + let mut builder = SourcePipeBuilder::create(); + let n_threads = ctx.settings.get_max_threads()? as usize; + let max_aligner = match ctx.plan { + InputPlan::CopyInto(_) => ctx.splits.len(), + InputPlan::StreamingLoad => 3, + }; + let (row_batch_tx, row_batch_rx) = crossbeam_channel::bounded(n_threads); + for _ in 0..std::cmp::min(max_aligner, n_threads) { + let output = OutputPort::create(); + let source = Aligner::::try_create( + output.clone(), + ctx.clone(), + split_rx.clone(), + row_batch_tx.clone(), + )?; + builder.add_source(output, source); + } + pipeline.add_pipe(builder.finalize()); + pipeline.resize(n_threads)?; + pipeline.add_transform(|input, output| { + DeserializeTransformer::::create(ctx.clone(), input, output, row_batch_rx.clone()) + })?; + Ok(()) + } + + async fn read_split(_ctx: Arc, _split_info: SplitInfo) -> Result { + unimplemented!() + } + + #[tracing::instrument(level = "debug", skip(ctx, batch_tx))] + async fn copy_reader_with_aligner( + ctx: Arc, + split_info: SplitInfo, + batch_tx: Sender>, + ) -> Result<()> { + tracing::debug!("start"); + let object = ctx.operator.object(&split_info.file_info.path); + let offset = split_info.offset as u64; + let mut reader = object.range_reader(offset..).await?; + loop { + let mut batch = vec![0u8; ctx.read_batch_size]; + let n = read_full(&mut reader, &mut batch[0..]).await?; + if n == 0 { + break; + } else { + batch.truncate(n); + tracing::debug!("read {} bytes", n); + if let Err(e) = batch_tx.send(Ok(batch.into())).await { + tracing::warn!("fail to send ReadBatch: {}", e); + } + } + } + tracing::debug!("finished"); + Ok(()) + } +} + +pub async fn read_full(reader: &mut R, buf: &mut [u8]) -> Result { + let mut buf = &mut buf[0..]; + let mut n = 0; + while !buf.is_empty() { + let read = reader.read(buf).await?; + if read == 0 { + break; + } + n += read; + buf = &mut buf[read..] + } + Ok(n) +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs new file mode 100644 index 0000000000000..db999464d2f5b --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs @@ -0,0 +1,26 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod delimiter; +mod impls; +mod input_context; +mod input_format; +mod input_format_text; +mod input_pipeline; +mod source_aligner; +mod source_deserializer; +mod transform_deserializer; + +pub use input_context::InputContext; +pub use input_format::InputFormat; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs new file mode 100644 index 0000000000000..eb2f0c697f0dc --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs @@ -0,0 +1,172 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::mem; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_exception::ErrorCode; +use common_exception::Result; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use crossbeam_channel::TrySendError; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::Split; + +pub struct Aligner { + ctx: Arc, + output: Arc, + + // input + split_rx: async_channel::Receiver>, + + state: Option, + batch_rx: Option>>, + read_batch: Option, + + received_end_batch_of_split: bool, + no_more_split: bool, + + // output + row_batches: VecDeque, + row_batch_tx: crossbeam_channel::Sender, +} + +impl Aligner { + pub(crate) fn try_create( + output: Arc, + ctx: Arc, + split_rx: async_channel::Receiver>, + batch_tx: crossbeam_channel::Sender, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(Self { + ctx, + output, + split_rx, + row_batch_tx: batch_tx, + state: None, + read_batch: None, + batch_rx: None, + received_end_batch_of_split: false, + no_more_split: false, + row_batches: Default::default(), + }))) + } +} + +#[async_trait::async_trait] +impl Processor for Aligner { + fn name(&self) -> &'static str { + "Aligner" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.no_more_split && self.row_batches.is_empty() && self.read_batch.is_none() { + self.output.finish(); + Ok(Event::Finished) + } else if let Some(rb) = self.row_batches.pop_front() { + match self.row_batch_tx.try_send(rb) { + Ok(()) => { + tracing::debug!("aligner send row batch ok"); + self.output.push_data(Err(ErrorCode::Ok(""))); + Ok(Event::NeedConsume) + } + Err(TrySendError::Full(b)) => { + tracing::debug!("aligner send row batch full"); + self.row_batches.push_front(b); + Ok(Event::NeedConsume) + } + Err(TrySendError::Disconnected(_)) => { + tracing::debug!("aligner send row batch disconnected"); + self.output.finish(); + Ok(Event::Finished) + } + } + } else if self.read_batch.is_some() || self.received_end_batch_of_split { + Ok(Event::Sync) + } else { + Ok(Event::Async) + } + } + + fn process(&mut self) -> Result<()> { + match &mut self.state { + Some(state) => { + let read_batch = mem::take(&mut self.read_batch); + let eof = read_batch.is_none(); + let row_batches = state.align(read_batch)?; + for b in row_batches.into_iter() { + self.row_batches.push_back(b); + } + if eof { + self.state = None; + self.batch_rx = None; + } + self.received_end_batch_of_split = false; + Ok(()) + } + _ => Err(ErrorCode::UnexpectedError("Aligner process state is none")), + } + } + + async fn async_process(&mut self) -> Result<()> { + if !self.no_more_split { + if self.state.is_none() { + match self.split_rx.recv().await { + Ok(split) => { + self.state = Some(I::AligningState::try_create(&self.ctx, &split.info)?); + self.batch_rx = Some(split.rx); + self.received_end_batch_of_split = false; + tracing::debug!( + "aligner recv new split {} {}", + &split.info.file_info.path, + split.info.seq_infile + ); + } + Err(_) => { + tracing::debug!("aligner no more split"); + self.no_more_split = true; + } + } + } + if let Some(rx) = self.batch_rx.as_mut() { + match rx.recv().await { + Some(Ok(batch)) => { + tracing::debug!("aligner recv new batch"); + self.read_batch = Some(batch) + } + Some(Err(e)) => { + return Err(e); + } + None => { + tracing::debug!("aligner recv end of current split"); + self.received_end_batch_of_split = true; + } + } + } + } + Ok(()) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs new file mode 100644 index 0000000000000..dadd99dd7cdb2 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs @@ -0,0 +1,116 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; + +pub struct DeserializeSource { + #[allow(unused)] + output: Arc, + + block_builder: I::BlockBuilder, + input_rx: async_channel::Receiver, + input_buffer: Option, + input_finished: bool, + output_buffer: VecDeque, +} + +impl DeserializeSource { + #[allow(unused)] + pub(crate) fn create( + ctx: Arc, + output: Arc, + rx: async_channel::Receiver, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(Self { + block_builder: I::BlockBuilder::create(ctx), + output, + input_rx: rx, + input_buffer: Default::default(), + input_finished: false, + output_buffer: Default::default(), + }))) + } +} + +#[async_trait::async_trait] +impl Processor for DeserializeSource { + fn name(&self) -> &'static str { + "Deserializer" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input_buffer = None; + self.input_finished = true; + Ok(Event::Finished) + } else if !self.output.can_push() { + Ok(Event::NeedConsume) + } else { + match self.output_buffer.pop_front() { + Some(data_block) => { + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } + None => { + if self.input_buffer.is_some() { + Ok(Event::Sync) + } else { + Ok(Event::Async) + } + } + } + } + } + + fn process(&mut self) -> Result<()> { + if self.input_finished { + assert!(self.input_buffer.is_none()); + } + let blocks = self.block_builder.deserialize(self.input_buffer.take())?; + for b in blocks.into_iter() { + self.output_buffer.push_back(b) + } + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + assert!(self.input_buffer.is_none() && !self.input_finished); + match self.input_rx.recv().await { + Ok(row_batch) => { + self.input_buffer = Some(row_batch); + } + Err(_) => { + self.input_finished = true; + } + } + Ok(()) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/transform_deserializer.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/transform_deserializer.rs new file mode 100644 index 0000000000000..c8ecf6be080f0 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/transform_deserializer.rs @@ -0,0 +1,150 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use crossbeam_channel::TryRecvError; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; + +struct DeserializeProcessor { + pub block_builder: I::BlockBuilder, + pub input_buffer: Option, + pub output_buffer: VecDeque, +} + +impl DeserializeProcessor { + pub(crate) fn create(ctx: Arc) -> Result { + Ok(Self { + block_builder: I::BlockBuilder::create(ctx), + input_buffer: Default::default(), + output_buffer: Default::default(), + }) + } + + fn process(&mut self) -> Result<()> { + let blocks = self.block_builder.deserialize(self.input_buffer.take())?; + for b in blocks.into_iter() { + self.output_buffer.push_back(b) + } + Ok(()) + } +} + +pub struct DeserializeTransformer { + processor: DeserializeProcessor, + input: Arc, + output: Arc, + rx: crossbeam_channel::Receiver, + flushing: bool, +} + +impl DeserializeTransformer { + pub(crate) fn create( + ctx: Arc, + input: Arc, + output: Arc, + rx: crossbeam_channel::Receiver, + ) -> Result { + let processor = DeserializeProcessor::create(ctx)?; + Ok(ProcessorPtr::create(Box::new(Self { + processor, + input, + output, + rx, + flushing: false, + }))) + } +} + +#[async_trait::async_trait] +impl Processor for DeserializeTransformer { + fn name(&self) -> &'static str { + "DeserializeTransformer" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + Ok(Event::Finished) + } else if !self.output.can_push() { + self.input.set_not_need_data(); + Ok(Event::NeedConsume) + } else { + match self.processor.output_buffer.pop_front() { + Some(data_block) => { + tracing::info!("DeserializeTransformer push rows {}", data_block.num_rows()); + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } + None => { + if self.processor.input_buffer.is_some() { + Ok(Event::Sync) + } else { + if self.input.has_data() { + self.input.pull_data(); + match self.rx.try_recv() { + Ok(read_batch) => { + self.processor.input_buffer = Some(read_batch); + return Ok(Event::Sync); + } + Err(TryRecvError::Disconnected) => { + tracing::warn!("DeserializeTransformer rx disconnected"); + self.input.finish(); + self.flushing = true; + return Ok(Event::Finished); + } + Err(TryRecvError::Empty) => { + // do nothing + } + } + } + // !has_data() or try_recv return Empty + if self.input.is_finished() { + if self.flushing { + self.output.finish(); + Ok(Event::Finished) + } else { + self.flushing = true; + Ok(Event::Sync) + } + } else { + self.input.set_need_data(); + Ok(Event::NeedData) + } + } + } + } + } + } + + fn process(&mut self) -> Result<()> { + self.processor.process() + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/mod.rs b/src/query/pipeline/sources/src/processors/sources/mod.rs index d8283bf320b34..f11656c1fce2c 100644 --- a/src/query/pipeline/sources/src/processors/sources/mod.rs +++ b/src/query/pipeline/sources/src/processors/sources/mod.rs @@ -17,6 +17,7 @@ pub mod blocks_source; pub mod deserializer; pub mod empty_source; pub mod file_splitter; +pub mod input_formats; pub mod multi_file_splitter; mod one_block_source; pub mod stream_source; diff --git a/src/query/service/src/interpreters/interpreter_copy_v2.rs b/src/query/service/src/interpreters/interpreter_copy_v2.rs index 811591719d421..63d67b95cfabe 100644 --- a/src/query/service/src/interpreters/interpreter_copy_v2.rs +++ b/src/query/service/src/interpreters/interpreter_copy_v2.rs @@ -269,6 +269,7 @@ impl CopyInterpreterV2 { tracing::info!("copy_files_to_table from source: {:?}", read_source_plan); let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?; + from_table.read_partitions(self.ctx.clone(), None).await?; from_table.read2( self.ctx.clone(), &read_source_plan, diff --git a/src/query/service/src/storages/stage/stage_source.rs b/src/query/service/src/storages/stage/stage_source.rs index 5659e4dd8cb48..12ce68f922644 100644 --- a/src/query/service/src/storages/stage/stage_source.rs +++ b/src/query/service/src/storages/stage/stage_source.rs @@ -82,6 +82,7 @@ impl StageSourceHelper { } else { OperatorInfo::Cfg(stage_info.stage_params.storage.clone()) }; + let src = StageSourceHelper { ctx, operator_info, diff --git a/src/query/service/src/storages/stage/stage_table.rs b/src/query/service/src/storages/stage/stage_table.rs index 9ab72b8d067a2..7cc77b3f4bfcb 100644 --- a/src/query/service/src/storages/stage/stage_table.rs +++ b/src/query/service/src/storages/stage/stage_table.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::collections::VecDeque; use std::str::FromStr; use std::sync::Arc; @@ -29,15 +28,14 @@ use common_legacy_planners::Statistics; use common_meta_app::schema::TableInfo; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::SinkPipeBuilder; +use common_pipeline_sources::processors::sources::input_formats::InputContext; use parking_lot::Mutex; use tracing::info; use super::StageSourceHelper; -use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::ContextSink; use crate::pipelines::processors::TransformLimit; use crate::pipelines::Pipeline; -use crate::pipelines::SourcePipeBuilder; use crate::sessions::TableContext; use crate::storages::Table; @@ -47,6 +45,7 @@ pub struct StageTable { // But the Table trait need it: // fn get_table_info(&self) -> &TableInfo). table_info_placeholder: TableInfo, + input_context: Mutex>>, } impl StageTable { @@ -56,8 +55,14 @@ impl StageTable { Ok(Arc::new(Self { table_info, table_info_placeholder, + input_context: Default::default(), })) } + + fn get_input_context(&self) -> Option> { + let guard = self.input_context.lock(); + guard.clone() + } } #[async_trait::async_trait] @@ -73,39 +78,35 @@ impl Table for StageTable { async fn read_partitions( &self, - _ctx: Arc, + ctx: Arc, _push_downs: Option, ) -> Result<(Statistics, Partitions)> { + let operator = StageSourceHelper::get_op(&ctx, &self.table_info.stage_info).await?; + let input_ctx = Arc::new( + InputContext::try_create_from_copy( + operator, + ctx.get_settings().clone(), + ctx.get_format_settings()?, + self.table_info.schema.clone(), + self.table_info.stage_info.clone(), + self.table_info.files.clone(), + ctx.get_scan_progress(), + ) + .await?, + ); + let mut guard = self.input_context.lock(); + *guard = Some(input_ctx); Ok((Statistics::default(), vec![])) } fn read2( &self, - ctx: Arc, + _ctx: Arc, _plan: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { - let settings = ctx.get_settings(); - let mut builder = SourcePipeBuilder::create(); - let table_info = &self.table_info; - let schema = table_info.schema.clone(); - let mut files_deque = VecDeque::with_capacity(table_info.files.len()); - for f in &table_info.files { - files_deque.push_back(f.to_string()); - } - let files = Arc::new(Mutex::new(files_deque)); - - let stage_source = StageSourceHelper::try_create(ctx, schema, table_info.clone(), files)?; - - for _index in 0..settings.get_max_threads()? { - let output = OutputPort::create(); - builder.add_source(output.clone(), stage_source.get_splitter(output)?); - } - pipeline.add_pipe(builder.finalize()); - - pipeline.add_transform(|transform_input_port, transform_output_port| { - stage_source.get_deserializer(transform_input_port, transform_output_port) - })?; + let input_ctx = self.get_input_context().unwrap(); + input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?; let limit = self.table_info.stage_info.copy_options.size_limit; if limit > 0 { diff --git a/src/query/service/tests/it/storages/system/settings_table.rs b/src/query/service/tests/it/storages/system/settings_table.rs index a00d42ff34ec5..f11054aef203e 100644 --- a/src/query/service/tests/it/storages/system/settings_table.rs +++ b/src/query/service/tests/it/storages/system/settings_table.rs @@ -42,6 +42,7 @@ async fn test_settings_table() -> Result<()> { "| enable_planner_v2 | 1 | 1 | SESSION | Enable planner v2 by setting this variable to 1, default value: 1 | UInt64 |", "| field_delimiter | , | , | SESSION | Format field delimiter, default value: , | String |", "| flight_client_timeout | 60 | 60 | SESSION | Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds | UInt64 |", + "| input_read_buffer_size | 1048576 | 1048576 | SESSION | The size of buffer in bytes for input with format. By default, it is 1MB. | UInt64 |", "| group_by_two_level_threshold | 10000 | 10000 | SESSION | The threshold of keys to open two-level aggregation, default value: 10000 | UInt64 |", "| max_block_size | 10000 | 10000 | SESSION | Maximum block size for reading | UInt64 |", "| max_execute_time | 0 | 0 | SESSION | The maximum query execution time. it means no limit if the value is zero. default value: 0 | UInt64 |", diff --git a/src/query/settings/src/lib.rs b/src/query/settings/src/lib.rs index d14ad02bf5644..c1692883a80b9 100644 --- a/src/query/settings/src/lib.rs +++ b/src/query/settings/src/lib.rs @@ -152,6 +152,16 @@ impl Settings { desc: "The size of buffer in bytes for buffered reader of dal. By default, it is 1MB.", possible_values: None, }, + SettingValue { + default_value: UserSettingValue::UInt64(1024 * 1024), + user_setting: UserSetting::create( + "input_read_buffer_size", + UserSettingValue::UInt64(1024 * 1024), + ), + level: ScopeLevel::Session, + desc: "The size of buffer in bytes for input with format. By default, it is 1MB.", + possible_values: None, + }, // enable_new_processor_framework SettingValue { default_value: UserSettingValue::UInt64(1), @@ -370,6 +380,11 @@ impl Settings { self.try_get_u64(key) } + pub fn get_input_read_buffer_size(&self) -> Result { + let key = "input_read_buffer_size"; + self.try_get_u64(key) + } + pub fn get_enable_new_processor_framework(&self) -> Result { let key = "enable_new_processor_framework"; self.try_get_u64(key) diff --git a/tests/logictest/suites/base/06_show/06_0003_show_settings b/tests/logictest/suites/base/06_show/06_0003_show_settings index d58223a13c5c8..399187f008dc6 100644 --- a/tests/logictest/suites/base/06_show/06_0003_show_settings +++ b/tests/logictest/suites/base/06_show/06_0003_show_settings @@ -17,6 +17,7 @@ enable_planner_v2 1 1 SESSION Enable planner v2 by setting this variable to field_delimiter , , SESSION Format field delimiter, default value: , String flight_client_timeout 60 60 SESSION Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds UInt64 group_by_two_level_threshold 10000 10000 SESSION The threshold of keys to open two-level aggregation, default value: 10000 UInt64 +input_read_buffer_size 1048576 1048576 SESSION The size of buffer in bytes for input with format. By default, it is 1MB. UInt64 max_block_size 10000 10000 SESSION Maximum block size for reading UInt64 max_execute_time 0 0 SESSION The maximum query execution time. it means no limit if the value is zero. default value: 0 UInt64 max_threads 11 16 SESSION The maximum number of threads to execute the request. By default, it is determined automatically. UInt64 diff --git a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.result b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.result index a3fc5d28a909a..e8a8335427185 100644 --- a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.result +++ b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.result @@ -170,6 +170,8 @@ my loving than multing ведомосквы вместу ведомосквы вместу ====== SQL26 ====== +'kbnyjuj gjhnf gtgthm vfibys row 3 ставе +'kbnyjuj gjhnf gtgthm vfibys row 3 ставе /topic,6 на карта /topic,6 на карта 1 родильник @@ -178,8 +180,6 @@ my loving than multing 1 родильник 1 родный 1 родный -1 розник -1 розник ====== SQL27 ====== армянск армянск diff --git a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh index b26b9b8fade48..a0f9f97c442bd 100755 --- a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh +++ b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh @@ -9,7 +9,7 @@ cat $CURDIR/../ddl/hits.sql | $MYSQL_CLIENT_CONNECT hits_statements=( ## load data - "COPY INTO hits FROM 'https://repo.databend.rs/dataset/stateful/hits_100k.tsv' FILE_FORMAT = ( type = 'CSV' field_delimiter = '\t' record_delimiter = '\n' skip_header = 1 );" + "COPY INTO hits FROM 'https://repo.databend.rs/dataset/stateful/hits_100k.tsv' FILE_FORMAT = ( type = 'tsv' record_delimiter = '\n' skip_header = 1 );" ## run test "SELECT '====== SQL1 ======';" "SELECT COUNT(*) FROM hits;" @@ -22,7 +22,8 @@ hits_statements=( "SELECT '====== SQL5 ======';" "SELECT COUNT(DISTINCT UserID) FROM hits;" "SELECT '====== SQL6 ======';" - "SELECT COUNT(DISTINCT SearchPhrase) FROM hits;" + #"SELECT COUNT(DISTINCT SearchPhrase) FROM hits;" # wait for bugfix https://github.com/datafuselabs/databend/issues/7743 + "SELECT COUNT(DISTINCT SearchPhrase) FROM (select SearchPhrase from hits order by SearchPhrase)" "SELECT '====== SQL7 ======';" "SELECT MIN(EventDate), MAX(EventDate) FROM hits;" "SELECT '====== SQL8 ======';"