diff --git a/Cargo.lock b/Cargo.lock index 788074162b968..2a3334d7f5c6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5088,6 +5088,9 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ + "arrow-array 51.0.0", + "arrow-buffer 51.0.0", + "arrow-schema 51.0.0", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", diff --git a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs index a182e1759430c..a386fafbf713f 100644 --- a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs @@ -183,7 +183,13 @@ async fn materialize_virtual_columns( let virtual_block = DataBlock::new(virtual_columns, len); let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let _ = serialize_block(write_settings, &virtual_schema, virtual_block, &mut buffer)?; + let _ = serialize_block( + write_settings, + &virtual_schema, + virtual_block, + &mut buffer, + &Default::default(), + )?; write_data(buffer, operator, location).await?; diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 47f2211d70cb6..279d44524bf56 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -170,8 +170,7 @@ impl DataBlock { impl Column { pub fn into_arrow_rs(self) -> Arc { let arrow2_array: Box = self.as_arrow(); - let arrow_array: Arc = arrow2_array.into(); - arrow_array + arrow2_array.into() } } @@ -200,7 +199,7 @@ fn arrow_field_from_arrow2_field(field: Arrow2Field) -> ArrowField { Arrow2DataType::Struct(f) => { ArrowDataType::Struct(f.into_iter().map(arrow_field_from_arrow2_field).collect()) } - other => other.into(), + other => ArrowDataType::from(other), }; ArrowField::new(field.name, data_type, field.is_nullable).with_metadata(metadata) diff --git a/src/query/formats/src/output_format/parquet.rs b/src/query/formats/src/output_format/parquet.rs index 589a76250edba..222644fd31c13 100644 --- a/src/query/formats/src/output_format/parquet.rs +++ b/src/query/formats/src/output_format/parquet.rs @@ -53,7 +53,13 @@ impl OutputFormat for ParquetOutputFormat { return Ok(vec![]); } let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?; + let _ = blocks_to_parquet( + &self.schema, + blocks, + &mut buf, + TableCompression::Zstd, + &Default::default(), + )?; Ok(buf) } } diff --git a/src/query/service/src/test_kits/block_writer.rs b/src/query/service/src/test_kits/block_writer.rs index 2654c246f5c25..559e087bcac77 100644 --- a/src/query/service/src/test_kits/block_writer.rs +++ b/src/query/service/src/test_kits/block_writer.rs @@ -75,7 +75,7 @@ impl<'a> BlockWriter<'a> { }; let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let col_metas = serialize_block(&write_settings, schema, block, &mut buf)?; + let col_metas = serialize_block(&write_settings, schema, block, &mut buf, &col_stats)?; let file_size = buf.len() as u64; data_accessor.write(&location.0, buf).await?; @@ -126,6 +126,7 @@ impl<'a> BlockWriter<'a> { vec![index_block], &mut data, TableCompression::None, + &Default::default(), )?; let size = data.len() as u64; data_accessor.write(&location.0, data).await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 383743788863c..201426d632614 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -760,7 +760,8 @@ impl CompactSegmentTestFixture { }; let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let col_metas = serialize_block(&write_settings, &schema, block, &mut buf)?; + let col_metas = + serialize_block(&write_settings, &schema, block, &mut buf, &col_stats)?; let file_size = buf.len() as u64; data_accessor.write(&location.0, buf).await?; diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index 421966921de11..ac9cc4763e221 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -12,10 +12,12 @@ doctest = false test = true [dependencies] +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } databend-common-exception = { path = "../../../../common/exception" } databend-common-expression = { path = "../../../expression" } -parquet_rs = { workspace = true } - databend-storages-common-table-meta = { path = "../table_meta" } +parquet_rs = { workspace = true } [build-dependencies] diff --git a/src/query/storages/common/blocks/src/codec/byte_array.rs b/src/query/storages/common/blocks/src/codec/byte_array.rs new file mode 100644 index 0000000000000..7002fb583f77b --- /dev/null +++ b/src/query/storages/common/blocks/src/codec/byte_array.rs @@ -0,0 +1,90 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_array::types::ByteArrayType; +use arrow_array::types::LargeBinaryType; +use arrow_array::types::LargeUtf8Type; +use arrow_array::Array; +use arrow_array::GenericByteArray; +use arrow_schema::DataType as ArrowDataType; +use databend_common_exception::Result; +use databend_storages_common_table_meta::meta::ColumnStatistics; +use parquet_rs::basic::Encoding; +use parquet_rs::file::properties::WriterPropertiesBuilder; +use parquet_rs::schema::types::ColumnPath; + +const ROWS_PER_DISTINCT_THRESHOLD: f64 = 10.0; +const SAMPLE_ROWS: usize = 1000; +const AVERAGE_PREFIX_LEN_THRESHOLD: f64 = 8.0; + +pub fn choose_byte_array_encoding( + mut props: WriterPropertiesBuilder, + stat: Option<&ColumnStatistics>, + array: Arc, + column_name: &str, +) -> Result { + if array.is_empty() { + return Ok(props); + } + let col_path = ColumnPath::new(vec![column_name.to_string()]); + let ndv = stat.as_ref().and_then(|s| s.distinct_of_values); + let num_rows = array.len(); + if let Some(ndv) = ndv { + if num_rows as f64 / ndv as f64 > ROWS_PER_DISTINCT_THRESHOLD { + props = props.set_column_dictionary_enabled(col_path, true); + return Ok(props); + } + } + let data_type = array.data_type(); + match data_type { + ArrowDataType::LargeBinary => { + if can_apply_delta_byte_array::(&array)? { + props = props.set_column_encoding(col_path, Encoding::DELTA_BYTE_ARRAY); + return Ok(props); + } + } + ArrowDataType::LargeUtf8 => { + if can_apply_delta_byte_array::(&array)? { + props = props.set_column_encoding(col_path, Encoding::DELTA_BYTE_ARRAY); + return Ok(props); + } + } + _ => {} + }; + props = props.set_column_encoding(col_path, Encoding::DELTA_LENGTH_BYTE_ARRAY); + Ok(props) +} + +fn can_apply_delta_byte_array(array: &dyn Array) -> Result { + let num_rows = array.len(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + let mut sum_prefix_len = 0; + for i in 1..num_rows.min(SAMPLE_ROWS) { + let last: &[u8] = array.value(i - 1).as_ref(); + let cur: &[u8] = array.value(i).as_ref(); + let prefix_len = last + .iter() + .zip(cur.iter()) + .take_while(|(a, b)| a == b) + .count(); + sum_prefix_len += prefix_len; + } + let avg_prefix_len = sum_prefix_len as f64 / num_rows as f64; + Ok(avg_prefix_len > AVERAGE_PREFIX_LEN_THRESHOLD) +} diff --git a/src/query/storages/common/blocks/src/codec/choose.rs b/src/query/storages/common/blocks/src/codec/choose.rs new file mode 100644 index 0000000000000..a1a031fcf508d --- /dev/null +++ b/src/query/storages/common/blocks/src/codec/choose.rs @@ -0,0 +1,60 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::TableSchema; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; +use parquet_rs::basic::Type as PhysicalType; +use parquet_rs::file::properties::WriterPropertiesBuilder; +use parquet_rs::schema::types::Type; +use parquet_rs::schema::types::TypePtr; + +use super::byte_array::choose_byte_array_encoding; +use super::int::choose_int_encoding; + +pub fn choose_codec( + mut props: WriterPropertiesBuilder, + block: &DataBlock, + parquet_fields: &[TypePtr], + table_schema: &TableSchema, + stat: &StatisticsOfColumns, +) -> Result { + for ((parquet_field, table_field), entry) in parquet_fields + .iter() + .zip(table_schema.fields.iter()) + .zip(block.columns()) + { + let column = entry.to_column(block.num_rows()); + let array = column.into_arrow_rs(); + let stat = stat.get(&table_field.column_id); + let column_name = table_field.name.as_str(); + match parquet_field.as_ref() { + Type::PrimitiveType { physical_type, .. } => match physical_type { + PhysicalType::BYTE_ARRAY => { + props = choose_byte_array_encoding(props, stat, array, column_name)?; + } + PhysicalType::INT32 | PhysicalType::INT64 => { + props = choose_int_encoding(props, stat, array, column_name)? + } + _ => {} + }, + Type::GroupType { + basic_info: _, + fields: _, + } => {} // TODO: handle nested fields + } + } + Ok(props) +} diff --git a/src/query/storages/common/blocks/src/codec/int.rs b/src/query/storages/common/blocks/src/codec/int.rs new file mode 100644 index 0000000000000..f405b5d0c7ee6 --- /dev/null +++ b/src/query/storages/common/blocks/src/codec/int.rs @@ -0,0 +1,85 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_array::types::ArrowPrimitiveType; +use arrow_array::types::Date32Type; +use arrow_array::types::Decimal128Type; +use arrow_array::types::Decimal256Type; +use arrow_array::types::Int32Type; +use arrow_array::types::Int64Type; +use arrow_array::types::TimestampMicrosecondType; +use arrow_array::types::UInt32Type; +use arrow_array::types::UInt64Type; +use arrow_array::Array; +use arrow_array::ArrowNativeTypeOp; +use arrow_array::PrimitiveArray; +use arrow_buffer::ArrowNativeType; +use arrow_schema::DataType as ArrowDataType; +use arrow_schema::TimeUnit; +use databend_common_exception::Result; +use databend_storages_common_table_meta::meta::ColumnStatistics; +use parquet_rs::basic::Encoding; +use parquet_rs::file::properties::WriterPropertiesBuilder; +use parquet_rs::schema::types::ColumnPath; + +const MAX_WIDTH_THRESHOLD: i64 = 3; + +pub fn choose_int_encoding( + mut props: WriterPropertiesBuilder, + _stat: Option<&ColumnStatistics>, + array: Arc, + column_name: &str, +) -> Result { + if array.is_empty() { + return Ok(props); + } + let col_path = ColumnPath::new(vec![column_name.to_string()]); + let data_type = array.data_type(); + let apply_delta = match data_type { + ArrowDataType::Int32 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::Int64 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::UInt32 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::UInt64 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => { + can_apply_delta_binary_pack::(&array)? + } + ArrowDataType::Date32 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::Decimal128(_, _) => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::Decimal256(_, _) => can_apply_delta_binary_pack::(&array)?, + _ => false, + }; + if apply_delta { + props = props.set_column_encoding(col_path, Encoding::DELTA_BINARY_PACKED); + } + Ok(props) +} + +fn can_apply_delta_binary_pack(array: &dyn Array) -> Result { + let mut max_delta = T::Native::MIN_TOTAL_ORDER; + let mut min_delta = T::Native::MAX_TOTAL_ORDER; + let array = array.as_any().downcast_ref::>().unwrap(); + for i in 1..array.len() { + let delta = array.value(i).sub_wrapping(array.value(i - 1)); + if delta.is_gt(max_delta) { + max_delta = delta; + } + if delta.is_lt(min_delta) { + min_delta = delta; + } + } + let x = max_delta.sub_wrapping(min_delta).as_usize(); + Ok(x <= (1 << MAX_WIDTH_THRESHOLD)) +} diff --git a/src/query/storages/common/blocks/src/codec/mod.rs b/src/query/storages/common/blocks/src/codec/mod.rs new file mode 100644 index 0000000000000..11508ef2490a0 --- /dev/null +++ b/src/query/storages/common/blocks/src/codec/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod byte_array; +mod choose; +mod int; + +pub use choose::choose_codec; diff --git a/src/query/storages/common/blocks/src/lib.rs b/src/query/storages/common/blocks/src/lib.rs index 722d2d20c6d89..b80b30b552de8 100644 --- a/src/query/storages/common/blocks/src/lib.rs +++ b/src/query/storages/common/blocks/src/lib.rs @@ -14,5 +14,6 @@ #![allow(clippy::uninlined_format_args)] +mod codec; mod parquet_rs; pub use parquet_rs::blocks_to_parquet; diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 440caa22cd2fb..13faab7e8b8e0 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -18,37 +18,56 @@ use databend_common_exception::Result; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema_ignore_inside_nullable; use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; +use parquet_rs::arrow::arrow_to_parquet_schema; use parquet_rs::arrow::ArrowWriter; use parquet_rs::basic::Encoding; use parquet_rs::file::properties::EnabledStatistics; use parquet_rs::file::properties::WriterProperties; use parquet_rs::format::FileMetaData; +use crate::codec::choose_codec; + /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( table_schema: &TableSchema, blocks: Vec, write_buffer: &mut Vec, compression: TableCompression, + stat: &StatisticsOfColumns, ) -> Result { assert!(!blocks.is_empty()); - let props = WriterProperties::builder() + let mut props_builder = WriterProperties::builder() .set_compression(compression.into()) // use `usize::MAX` to effectively limit the number of row groups to 1 .set_max_row_group_size(usize::MAX) + // this is a global setting, will be covered by the column level setting(if set) .set_encoding(Encoding::PLAIN) + // ditto .set_dictionary_enabled(false) .set_statistics_enabled(EnabledStatistics::None) - .set_bloom_filter_enabled(false) - .build(); + .set_bloom_filter_enabled(false); + let arrow_schema = Arc::new(table_schema_to_arrow_schema_ignore_inside_nullable( + table_schema, + )); + let parquet_schema = arrow_to_parquet_schema(&arrow_schema)?; + if blocks.len() == 1 { + // doesn't not cover the case of multiple blocks for now. to simplify the implementation + props_builder = choose_codec( + props_builder, + &blocks[0], + parquet_schema.root_schema().get_fields(), + table_schema, + stat, + )?; + } + let props = props_builder.build(); let batches = blocks .into_iter() .map(|block| block.to_record_batch(table_schema)) .collect::>>()?; - let arrow_schema = Arc::new(table_schema_to_arrow_schema_ignore_inside_nullable( - table_schema, - )); + let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?; for batch in batches { writer.write(&batch)?; diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index f56d1756293de..66bf783458422 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -39,6 +39,7 @@ use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use opendal::Operator; @@ -49,19 +50,24 @@ use crate::operations::column_parquet_metas; use crate::statistics::gen_columns_statistics; use crate::statistics::ClusterStatsGenerator; use crate::FuseStorageFormat; - // TODO rename this, it is serialization, or pass in a writer(if not rename) pub fn serialize_block( write_settings: &WriteSettings, schema: &TableSchemaRef, block: DataBlock, buf: &mut Vec, + stat: &StatisticsOfColumns, ) -> Result> { let schema = Arc::new(schema.remove_virtual_computed_fields()); match write_settings.storage_format { FuseStorageFormat::Parquet => { - let result = - blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?; + let result = blocks_to_parquet( + &schema, + vec![block], + buf, + write_settings.table_compression, + stat, + )?; let meta = column_parquet_metas(&result, &schema)?; Ok(meta) } @@ -143,6 +149,7 @@ impl BloomIndexState { vec![index_block], &mut data, TableCompression::None, + &Default::default(), )?; let data_size = data.len() as u64; Ok(Some(Self { @@ -300,6 +307,7 @@ impl BlockBuilder { &self.source_schema, data_block, &mut buffer, + &col_stats, )?; let file_size = buffer.len() as u64; let block_meta = BlockMeta { diff --git a/src/query/storages/fuse/src/operations/agg_index_sink.rs b/src/query/storages/fuse/src/operations/agg_index_sink.rs index 3cff22409eae1..51b3a1bc4fe90 100644 --- a/src/query/storages/fuse/src/operations/agg_index_sink.rs +++ b/src/query/storages/fuse/src/operations/agg_index_sink.rs @@ -111,7 +111,13 @@ impl AsyncSink for AggIndexSink { self.index_id, ); let mut data = vec![]; - io::serialize_block(&self.write_settings, &self.sink_schema, block, &mut data)?; + io::serialize_block( + &self.write_settings, + &self.sink_schema, + block, + &mut data, + &Default::default(), + )?; { metrics_inc_agg_index_write_nums(1); diff --git a/src/query/storages/result_cache/src/write/writer.rs b/src/query/storages/result_cache/src/write/writer.rs index 0ba10167d2762..b3aeffbc5bb38 100644 --- a/src/query/storages/result_cache/src/write/writer.rs +++ b/src/query/storages/result_cache/src/write/writer.rs @@ -77,6 +77,7 @@ impl ResultCacheWriter { self.blocks.clone(), &mut buf, TableCompression::None, + &Default::default(), )?; let file_location = format!("{}/{}.parquet", self.location, Uuid::new_v4().as_simple());