From 43befbe507e73505cbe48621a072696cb6c2c30d Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 11 Apr 2024 21:20:12 +0800 Subject: [PATCH 01/11] feat: try enable parquet encoding --- .../fuse/operations/virtual_columns.rs | 8 ++- .../formats/src/output_format/parquet.rs | 8 ++- .../service/src/test_kits/block_writer.rs | 3 +- .../mutation/segments_compact_mutator.rs | 3 +- .../storages/common/blocks/src/parquet_rs.rs | 49 +++++++++++++++++-- .../fuse/src/io/write/block_writer.rs | 14 ++++-- .../fuse/src/operations/agg_index_sink.rs | 8 ++- .../storages/result_cache/src/write/writer.rs | 1 + 8 files changed, 83 insertions(+), 11 deletions(-) diff --git a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs index a182e1759430c..a386fafbf713f 100644 --- a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs @@ -183,7 +183,13 @@ async fn materialize_virtual_columns( let virtual_block = DataBlock::new(virtual_columns, len); let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let _ = serialize_block(write_settings, &virtual_schema, virtual_block, &mut buffer)?; + let _ = serialize_block( + write_settings, + &virtual_schema, + virtual_block, + &mut buffer, + &Default::default(), + )?; write_data(buffer, operator, location).await?; diff --git a/src/query/formats/src/output_format/parquet.rs b/src/query/formats/src/output_format/parquet.rs index 589a76250edba..222644fd31c13 100644 --- a/src/query/formats/src/output_format/parquet.rs +++ b/src/query/formats/src/output_format/parquet.rs @@ -53,7 +53,13 @@ impl OutputFormat for ParquetOutputFormat { return Ok(vec![]); } let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?; + let _ = blocks_to_parquet( + &self.schema, + blocks, + &mut buf, + TableCompression::Zstd, + &Default::default(), + )?; Ok(buf) } } diff --git a/src/query/service/src/test_kits/block_writer.rs b/src/query/service/src/test_kits/block_writer.rs index 2654c246f5c25..559e087bcac77 100644 --- a/src/query/service/src/test_kits/block_writer.rs +++ b/src/query/service/src/test_kits/block_writer.rs @@ -75,7 +75,7 @@ impl<'a> BlockWriter<'a> { }; let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let col_metas = serialize_block(&write_settings, schema, block, &mut buf)?; + let col_metas = serialize_block(&write_settings, schema, block, &mut buf, &col_stats)?; let file_size = buf.len() as u64; data_accessor.write(&location.0, buf).await?; @@ -126,6 +126,7 @@ impl<'a> BlockWriter<'a> { vec![index_block], &mut data, TableCompression::None, + &Default::default(), )?; let size = data.len() as u64; data_accessor.write(&location.0, data).await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 071175c52a960..8ff744d141335 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -765,7 +765,8 @@ impl CompactSegmentTestFixture { }; let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let col_metas = serialize_block(&write_settings, &schema, block, &mut buf)?; + let col_metas = + serialize_block(&write_settings, &schema, block, &mut buf, &col_stats)?; let file_size = buf.len() as u64; data_accessor.write(&location.0, buf).await?; diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 440caa22cd2fb..0056dee91b0ae 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -18,12 +18,16 @@ use databend_common_exception::Result; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema_ignore_inside_nullable; use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use parquet_rs::arrow::ArrowWriter; +use parquet_rs::basic::Compression; use parquet_rs::basic::Encoding; use parquet_rs::file::properties::EnabledStatistics; use parquet_rs::file::properties::WriterProperties; +use parquet_rs::file::properties::WriterPropertiesBuilder; use parquet_rs::format::FileMetaData; +use parquet_rs::schema::types::ColumnPath; /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( @@ -31,17 +35,24 @@ pub fn blocks_to_parquet( blocks: Vec, write_buffer: &mut Vec, compression: TableCompression, + stat: &StatisticsOfColumns, ) -> Result { assert!(!blocks.is_empty()); - let props = WriterProperties::builder() + let mut props_builder = WriterProperties::builder() .set_compression(compression.into()) // use `usize::MAX` to effectively limit the number of row groups to 1 .set_max_row_group_size(usize::MAX) + // this is a global setting, will be covered by the column level setting(if set) .set_encoding(Encoding::PLAIN) + // ditto .set_dictionary_enabled(false) .set_statistics_enabled(EnabledStatistics::None) - .set_bloom_filter_enabled(false) - .build(); + .set_bloom_filter_enabled(false); + if blocks.len() == 1 { + // doesn't not cover the case of multiple blocks for now. to simplify the implementation + props_builder = choose_compression_scheme(props_builder, &blocks[0], table_schema, stat)?; + } + let props = props_builder.build(); let batches = blocks .into_iter() .map(|block| block.to_record_batch(table_schema)) @@ -56,3 +67,35 @@ pub fn blocks_to_parquet( let file_meta = writer.close()?; Ok(file_meta) } + +fn choose_compression_scheme( + mut props: WriterPropertiesBuilder, + block: &DataBlock, + table_schema: &TableSchema, + stat: &StatisticsOfColumns, +) -> Result { + // These parameters have not been finely tuned. + const ENABLE_DICT_THRESHOLD: f64 = 5.0; + + let num_rows = block.num_rows(); + + for (field, _col) in table_schema.fields().iter().zip(block.columns()) { + if field.is_nested() { + // skip nested fields for now, to simplify the implementation + continue; + } + let col_id = field.column_id(); + if let Some(col_stat) = stat.get(&col_id) { + if col_stat + .distinct_of_values + .is_some_and(|ndv| num_rows as f64 / ndv as f64 > ENABLE_DICT_THRESHOLD) + { + let col_path = ColumnPath::new(vec![field.name().clone()]); + props = props + .set_column_dictionary_enabled(col_path.clone(), true) + .set_column_compression(col_path, Compression::UNCOMPRESSED); + } + } + } + Ok(props) +} diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 9ee13be51304c..e09c0f14bc4af 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -34,6 +34,7 @@ use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use opendal::Operator; @@ -43,19 +44,24 @@ use crate::operations::column_parquet_metas; use crate::statistics::gen_columns_statistics; use crate::statistics::ClusterStatsGenerator; use crate::FuseStorageFormat; - // TODO rename this, it is serialization, or pass in a writer(if not rename) pub fn serialize_block( write_settings: &WriteSettings, schema: &TableSchemaRef, block: DataBlock, buf: &mut Vec, + stat: &StatisticsOfColumns, ) -> Result> { let schema = Arc::new(schema.remove_virtual_computed_fields()); match write_settings.storage_format { FuseStorageFormat::Parquet => { - let result = - blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?; + let result = blocks_to_parquet( + &schema, + vec![block], + buf, + write_settings.table_compression, + stat, + )?; let meta = column_parquet_metas(&result, &schema)?; Ok(meta) } @@ -137,6 +143,7 @@ impl BloomIndexState { vec![index_block], &mut data, TableCompression::None, + &Default::default(), )?; let data_size = data.len() as u64; Ok(Some(Self { @@ -197,6 +204,7 @@ impl BlockBuilder { &self.source_schema, data_block, &mut buffer, + &col_stats, )?; let file_size = buffer.len() as u64; let block_meta = BlockMeta { diff --git a/src/query/storages/fuse/src/operations/agg_index_sink.rs b/src/query/storages/fuse/src/operations/agg_index_sink.rs index 3cff22409eae1..51b3a1bc4fe90 100644 --- a/src/query/storages/fuse/src/operations/agg_index_sink.rs +++ b/src/query/storages/fuse/src/operations/agg_index_sink.rs @@ -111,7 +111,13 @@ impl AsyncSink for AggIndexSink { self.index_id, ); let mut data = vec![]; - io::serialize_block(&self.write_settings, &self.sink_schema, block, &mut data)?; + io::serialize_block( + &self.write_settings, + &self.sink_schema, + block, + &mut data, + &Default::default(), + )?; { metrics_inc_agg_index_write_nums(1); diff --git a/src/query/storages/result_cache/src/write/writer.rs b/src/query/storages/result_cache/src/write/writer.rs index 0ba10167d2762..b3aeffbc5bb38 100644 --- a/src/query/storages/result_cache/src/write/writer.rs +++ b/src/query/storages/result_cache/src/write/writer.rs @@ -77,6 +77,7 @@ impl ResultCacheWriter { self.blocks.clone(), &mut buf, TableCompression::None, + &Default::default(), )?; let file_location = format!("{}/{}.parquet", self.location, Uuid::new_v4().as_simple()); From 4866732cbbf5d06ebfe2ef1437c5e075d217b12a Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 11 Apr 2024 22:06:03 +0800 Subject: [PATCH 02/11] fix compression --- src/query/storages/common/blocks/src/parquet_rs.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 0056dee91b0ae..35ee3a91a3f5c 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -91,9 +91,10 @@ fn choose_compression_scheme( .is_some_and(|ndv| num_rows as f64 / ndv as f64 > ENABLE_DICT_THRESHOLD) { let col_path = ColumnPath::new(vec![field.name().clone()]); - props = props - .set_column_dictionary_enabled(col_path.clone(), true) - .set_column_compression(col_path, Compression::UNCOMPRESSED); + props = props.set_column_dictionary_enabled(col_path.clone(), true); + // TODO: figure out the benefit of disable compression when encoding is effective. + // Currently, compression type recorded in block meta is in file level, not column level. + // .set_column_compression(col_path, Compression::UNCOMPRESSED); } } } From 69dd29056fcbc89a33683d534278efe71cf438f1 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 11 Apr 2024 22:27:31 +0800 Subject: [PATCH 03/11] make lint --- src/query/storages/common/blocks/src/parquet_rs.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 35ee3a91a3f5c..3e3481fdeff9d 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -21,7 +21,6 @@ use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use parquet_rs::arrow::ArrowWriter; -use parquet_rs::basic::Compression; use parquet_rs::basic::Encoding; use parquet_rs::file::properties::EnabledStatistics; use parquet_rs::file::properties::WriterProperties; From 2ec55aea4c040629663e7fdf6aa39792e709cdce Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 6 May 2024 15:11:53 +0800 Subject: [PATCH 04/11] try enable dict encoding --- Cargo.lock | 1 + src/query/storages/common/blocks/Cargo.toml | 1 + .../storages/common/blocks/src/parquet_rs.rs | 88 +++++++++++++------ 3 files changed, 65 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 345399bc15ab4..5a6fdbe323c59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4795,6 +4795,7 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ + "arrow-schema 50.0.0", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index ee74f856b6f9d..49b835a88ffd8 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -12,6 +12,7 @@ doctest = false test = false [dependencies] +arrow-schema = { workspace = true } databend-common-exception = { path = "../../../../common/exception" } databend-common-expression = { path = "../../../expression" } parquet_rs = { workspace = true } diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 3e3481fdeff9d..840fbdce63cd6 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -16,17 +16,22 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema_ignore_inside_nullable; +use databend_common_expression::BlockEntry; use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; +use parquet_rs::arrow::arrow_to_parquet_schema; use parquet_rs::arrow::ArrowWriter; use parquet_rs::basic::Encoding; +use parquet_rs::basic::Type as PhysicalType; use parquet_rs::file::properties::EnabledStatistics; use parquet_rs::file::properties::WriterProperties; use parquet_rs::file::properties::WriterPropertiesBuilder; use parquet_rs::format::FileMetaData; use parquet_rs::schema::types::ColumnPath; +use parquet_rs::schema::types::Type; +use parquet_rs::schema::types::TypePtr; /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( @@ -47,18 +52,26 @@ pub fn blocks_to_parquet( .set_dictionary_enabled(false) .set_statistics_enabled(EnabledStatistics::None) .set_bloom_filter_enabled(false); + let arrow_schema = Arc::new(table_schema_to_arrow_schema_ignore_inside_nullable( + table_schema, + )); + let parquet_schema = arrow_to_parquet_schema(&arrow_schema)?; if blocks.len() == 1 { // doesn't not cover the case of multiple blocks for now. to simplify the implementation - props_builder = choose_compression_scheme(props_builder, &blocks[0], table_schema, stat)?; + props_builder = choose_compression_scheme( + props_builder, + &blocks[0], + parquet_schema.root_schema().get_fields(), + table_schema, + stat, + )?; } let props = props_builder.build(); let batches = blocks .into_iter() .map(|block| block.to_record_batch(table_schema)) .collect::>>()?; - let arrow_schema = Arc::new(table_schema_to_arrow_schema_ignore_inside_nullable( - table_schema, - )); + let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?; for batch in batches { writer.write(&batch)?; @@ -70,32 +83,57 @@ pub fn blocks_to_parquet( fn choose_compression_scheme( mut props: WriterPropertiesBuilder, block: &DataBlock, + parquet_fields: &[TypePtr], table_schema: &TableSchema, stat: &StatisticsOfColumns, ) -> Result { - // These parameters have not been finely tuned. - const ENABLE_DICT_THRESHOLD: f64 = 5.0; - - let num_rows = block.num_rows(); - - for (field, _col) in table_schema.fields().iter().zip(block.columns()) { - if field.is_nested() { - // skip nested fields for now, to simplify the implementation - continue; - } - let col_id = field.column_id(); - if let Some(col_stat) = stat.get(&col_id) { - if col_stat - .distinct_of_values - .is_some_and(|ndv| num_rows as f64 / ndv as f64 > ENABLE_DICT_THRESHOLD) - { - let col_path = ColumnPath::new(vec![field.name().clone()]); - props = props.set_column_dictionary_enabled(col_path.clone(), true); - // TODO: figure out the benefit of disable compression when encoding is effective. - // Currently, compression type recorded in block meta is in file level, not column level. - // .set_column_compression(col_path, Compression::UNCOMPRESSED); + for ((parquet_field, table_field), col) in parquet_fields + .iter() + .zip(table_schema.fields.iter()) + .zip(block.columns()) + { + match parquet_field.as_ref() { + Type::PrimitiveType { + basic_info: _, + physical_type, + type_length: _, + scale: _, + precision: _, + } => { + let distinct_of_values = stat + .get(&table_field.column_id) + .and_then(|stat| stat.distinct_of_values); + let num_rows = block.num_rows(); + if can_apply_dict_encoding(physical_type, distinct_of_values, num_rows, col)? { + let col_path = ColumnPath::new(vec![table_field.name().clone()]); + props = props.set_column_dictionary_enabled(col_path, true); + } } + Type::GroupType { + basic_info: _, + fields: _, + } => {} // TODO: handle nested fields } } Ok(props) } + +fn can_apply_dict_encoding( + physical_type: &PhysicalType, + distinct_of_values: Option, + num_rows: usize, + col: &BlockEntry, +) -> Result { + const LOW_CARDINALITY_THRESHOLD: f64 = 10.0; + const AVG_BYTES_PER_VALUE: f64 = 10.0; + if !matches!(physical_type, PhysicalType::BYTE_ARRAY) { + return Ok(false); + } + let is_low_cardinality = distinct_of_values + .is_some_and(|ndv| num_rows as f64 / ndv as f64 > LOW_CARDINALITY_THRESHOLD); + let column = col.value.convert_to_full_column(&col.data_type, num_rows); + let memory_size = column.memory_size(); + let total_bytes = memory_size - num_rows * 8; + let avg_bytes_per_value = total_bytes as f64 / num_rows as f64; + Ok(is_low_cardinality && avg_bytes_per_value < AVG_BYTES_PER_VALUE) +} From 9bfff0c32ced670c82190be129d451ace178e8b4 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 6 May 2024 15:15:58 +0800 Subject: [PATCH 05/11] fix check --- Cargo.lock | 1 - src/query/storages/common/blocks/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a6fdbe323c59..345399bc15ab4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4795,7 +4795,6 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ - "arrow-schema 50.0.0", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index 49b835a88ffd8..ee74f856b6f9d 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -12,7 +12,6 @@ doctest = false test = false [dependencies] -arrow-schema = { workspace = true } databend-common-exception = { path = "../../../../common/exception" } databend-common-expression = { path = "../../../expression" } parquet_rs = { workspace = true } From 621da2dda4943f2683f3bfd022c74bbb8620bbaf Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 6 May 2024 21:01:03 +0800 Subject: [PATCH 06/11] try enable delte binary packed encoding --- Cargo.lock | 1 + src/query/storages/common/blocks/Cargo.toml | 4 +- .../storages/common/blocks/src/parquet_rs.rs | 200 ++++++++++++++++++ 3 files changed, 203 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 345399bc15ab4..d30a980cfad88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4798,6 +4798,7 @@ dependencies = [ "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", + "ethnum 1.5.0 (git+https://github.com/ariesdevil/ethnum-rs?rev=4cb05f1)", "parquet", ] diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index ee74f856b6f9d..074764d6e986b 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -14,8 +14,8 @@ test = false [dependencies] databend-common-exception = { path = "../../../../common/exception" } databend-common-expression = { path = "../../../expression" } -parquet_rs = { workspace = true } - databend-storages-common-table-meta = { path = "../table_meta" } +ethnum = { workspace = true } +parquet_rs = { workspace = true } [build-dependencies] diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 840fbdce63cd6..e9d70a45246bd 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -14,13 +14,18 @@ use std::sync::Arc; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema_ignore_inside_nullable; +use databend_common_expression::types::DataType; +use databend_common_expression::types::DecimalDataType; +use databend_common_expression::types::NumberDataType; use databend_common_expression::BlockEntry; use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; +use ethnum::I256; use parquet_rs::arrow::arrow_to_parquet_schema; use parquet_rs::arrow::ArrowWriter; use parquet_rs::basic::Encoding; @@ -107,6 +112,9 @@ fn choose_compression_scheme( if can_apply_dict_encoding(physical_type, distinct_of_values, num_rows, col)? { let col_path = ColumnPath::new(vec![table_field.name().clone()]); props = props.set_column_dictionary_enabled(col_path, true); + } else if can_apply_delta_binary_pack(physical_type, col, num_rows)? { + let col_path = ColumnPath::new(vec![table_field.name().clone()]); + props = props.set_column_encoding(col_path, Encoding::DELTA_BINARY_PACKED); } } Type::GroupType { @@ -137,3 +145,195 @@ fn can_apply_dict_encoding( let avg_bytes_per_value = total_bytes as f64 / num_rows as f64; Ok(is_low_cardinality && avg_bytes_per_value < AVG_BYTES_PER_VALUE) } + +fn can_apply_delta_binary_pack( + physical_type: &PhysicalType, + col: &BlockEntry, + num_rows: usize, +) -> Result { + const MAX_DELTA: i64 = 1 << 3; + if !matches!( + physical_type, + PhysicalType::INT32 | PhysicalType::INT64 | PhysicalType::INT96 + ) { + return Ok(false); + } + if num_rows == 0 { + return Ok(false); + } + let col = col.value.convert_to_full_column(&col.data_type, num_rows); + match col.data_type().remove_nullable() { + DataType::Number(NumberDataType::UInt8) => { + let mut max_delta = 0; + let col = col.as_number().unwrap().as_u_int8().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta as i64 <= MAX_DELTA) + } + DataType::Number(NumberDataType::UInt16) => { + let mut max_delta = 0; + let col = col.as_number().unwrap().as_u_int16().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta as i64 <= MAX_DELTA) + } + DataType::Number(NumberDataType::UInt32) => { + let mut max_delta = 0; + let col = col.as_number().unwrap().as_u_int32().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta as i64 <= MAX_DELTA) + } + DataType::Number(NumberDataType::UInt64) => { + let mut max_delta = 0; + let col = col.as_number().unwrap().as_u_int64().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta as i64 <= MAX_DELTA) + } + DataType::Number(NumberDataType::Int8) => { + let mut max_delta = 0; + let col = col.as_number().unwrap().as_int8().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta as i64 <= MAX_DELTA) + } + DataType::Number(NumberDataType::Int16) => { + let mut max_delta = 0; + let col = col.as_number().unwrap().as_int16().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta as i64 <= MAX_DELTA) + } + DataType::Number(NumberDataType::Int32) => { + let mut max_delta = 0; + let col = col.as_number().unwrap().as_int32().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta as i64 <= MAX_DELTA) + } + DataType::Number(NumberDataType::Int64) => { + let mut max_delta = 0; + let col = col.as_number().unwrap().as_int64().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta <= MAX_DELTA) + } + DataType::Decimal(DecimalDataType::Decimal128(_)) => { + let mut max_delta = 0; + let (col, _) = col.as_decimal().unwrap().as_decimal128().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta <= MAX_DELTA as i128) + } + DataType::Decimal(DecimalDataType::Decimal256(_)) => { + let mut max_delta: I256 = I256::ZERO; + let (col, _) = col.as_decimal().unwrap().as_decimal256().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta <= I256::from(MAX_DELTA)) + } + DataType::Timestamp => { + let mut max_delta = 0; + let col = col.as_timestamp().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta <= MAX_DELTA) + } + DataType::Date => { + let mut max_delta = 0; + let col = col.as_date().unwrap(); + let mut col_iter = col.iter(); + let mut prev = *col_iter.next().unwrap(); + for &v in col_iter { + let delta = if v > prev { v - prev } else { prev - v }; + if delta > max_delta { + max_delta = delta; + } + prev = v; + } + Ok(max_delta as i64 <= MAX_DELTA) + } + _ => Err(ErrorCode::Internal(format!( + "Unsupported data type for delta binary pack: {:?}", + col.data_type() + ))), + } +} From 2f5e9a63b8cf7abed7fc327dfc975be448d7f371 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 6 May 2024 21:45:53 +0800 Subject: [PATCH 07/11] fix nullable --- .../storages/common/blocks/src/parquet_rs.rs | 77 +++++++++++++++---- 1 file changed, 64 insertions(+), 13 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index e9d70a45246bd..0c9e5083e92f6 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -161,10 +161,14 @@ fn can_apply_delta_binary_pack( if num_rows == 0 { return Ok(false); } - let col = col.value.convert_to_full_column(&col.data_type, num_rows); + let col = col + .value + .convert_to_full_column(&col.data_type, num_rows) + .remove_nullable(); match col.data_type().remove_nullable() { DataType::Number(NumberDataType::UInt8) => { let mut max_delta = 0; + let mut min_delta = u8::MAX; let col = col.as_number().unwrap().as_u_int8().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -173,12 +177,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta as i64 <= MAX_DELTA) + Ok((max_delta - min_delta) as i64 <= MAX_DELTA) } DataType::Number(NumberDataType::UInt16) => { let mut max_delta = 0; + let mut min_delta = u16::MAX; let col = col.as_number().unwrap().as_u_int16().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -187,12 +195,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta as i64 <= MAX_DELTA) + Ok((max_delta - min_delta) as i64 <= MAX_DELTA) } DataType::Number(NumberDataType::UInt32) => { let mut max_delta = 0; + let mut min_delta = u32::MAX; let col = col.as_number().unwrap().as_u_int32().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -201,12 +213,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta as i64 <= MAX_DELTA) + Ok((max_delta - min_delta) as i64 <= MAX_DELTA) } DataType::Number(NumberDataType::UInt64) => { let mut max_delta = 0; + let mut min_delta = u64::MAX; let col = col.as_number().unwrap().as_u_int64().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -215,12 +231,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta as i64 <= MAX_DELTA) + Ok((max_delta - min_delta) <= MAX_DELTA as u64) } DataType::Number(NumberDataType::Int8) => { let mut max_delta = 0; + let mut min_delta = i8::MAX; let col = col.as_number().unwrap().as_int8().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -229,12 +249,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta as i64 <= MAX_DELTA) + Ok((max_delta - min_delta) as i64 <= MAX_DELTA) } DataType::Number(NumberDataType::Int16) => { let mut max_delta = 0; + let mut min_delta = i16::MAX; let col = col.as_number().unwrap().as_int16().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -243,12 +267,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta as i64 <= MAX_DELTA) + Ok((max_delta - min_delta) as i64 <= MAX_DELTA) } DataType::Number(NumberDataType::Int32) => { let mut max_delta = 0; + let mut min_delta = i32::MAX; let col = col.as_number().unwrap().as_int32().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -257,12 +285,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta as i64 <= MAX_DELTA) + Ok((max_delta - min_delta) as i64 <= MAX_DELTA) } DataType::Number(NumberDataType::Int64) => { let mut max_delta = 0; + let mut min_delta = i64::MAX; let col = col.as_number().unwrap().as_int64().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -271,12 +303,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta <= MAX_DELTA) + Ok((max_delta - min_delta) <= MAX_DELTA) } DataType::Decimal(DecimalDataType::Decimal128(_)) => { let mut max_delta = 0; + let mut min_delta = i128::MAX; let (col, _) = col.as_decimal().unwrap().as_decimal128().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -285,12 +321,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta <= MAX_DELTA as i128) + Ok((max_delta - min_delta) <= MAX_DELTA as i128) } DataType::Decimal(DecimalDataType::Decimal256(_)) => { let mut max_delta: I256 = I256::ZERO; + let mut min_delta = I256::MAX; let (col, _) = col.as_decimal().unwrap().as_decimal256().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -299,12 +339,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta <= I256::from(MAX_DELTA)) + Ok(max_delta - min_delta <= I256::from(MAX_DELTA)) } DataType::Timestamp => { let mut max_delta = 0; + let mut min_delta = i64::MAX; let col = col.as_timestamp().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -313,12 +357,16 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta <= MAX_DELTA) + Ok((max_delta - min_delta) <= MAX_DELTA) } DataType::Date => { let mut max_delta = 0; + let mut min_delta = i32::MAX; let col = col.as_date().unwrap(); let mut col_iter = col.iter(); let mut prev = *col_iter.next().unwrap(); @@ -327,9 +375,12 @@ fn can_apply_delta_binary_pack( if delta > max_delta { max_delta = delta; } + if delta < min_delta { + min_delta = delta; + } prev = v; } - Ok(max_delta as i64 <= MAX_DELTA) + Ok((max_delta - min_delta) as i64 <= MAX_DELTA) } _ => Err(ErrorCode::Internal(format!( "Unsupported data type for delta binary pack: {:?}", From fbb7592de01578a3246831a97650fe35702806a2 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 10 May 2024 18:22:11 +0800 Subject: [PATCH 08/11] test dict only --- .../storages/common/blocks/src/parquet_rs.rs | 49 ++++++------------- 1 file changed, 16 insertions(+), 33 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 0c9e5083e92f6..e9203a16f1dfd 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -92,7 +92,7 @@ fn choose_compression_scheme( table_schema: &TableSchema, stat: &StatisticsOfColumns, ) -> Result { - for ((parquet_field, table_field), col) in parquet_fields + for ((parquet_field, table_field), _col) in parquet_fields .iter() .zip(table_schema.fields.iter()) .zip(block.columns()) @@ -104,19 +104,21 @@ fn choose_compression_scheme( type_length: _, scale: _, precision: _, - } => { - let distinct_of_values = stat - .get(&table_field.column_id) - .and_then(|stat| stat.distinct_of_values); - let num_rows = block.num_rows(); - if can_apply_dict_encoding(physical_type, distinct_of_values, num_rows, col)? { - let col_path = ColumnPath::new(vec![table_field.name().clone()]); - props = props.set_column_dictionary_enabled(col_path, true); - } else if can_apply_delta_binary_pack(physical_type, col, num_rows)? { - let col_path = ColumnPath::new(vec![table_field.name().clone()]); - props = props.set_column_encoding(col_path, Encoding::DELTA_BINARY_PACKED); + } => match physical_type { + PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + let ndv = stat + .get(&table_field.column_id) + .and_then(|stat| stat.distinct_of_values); + let num_rows = block.num_rows(); + if let Some(ndv) = ndv { + if num_rows as f64 / ndv as f64 > 10.0 { + let col_path = ColumnPath::new(vec![table_field.name().clone()]); + props = props.set_column_dictionary_enabled(col_path, true); + } + } } - } + _ => {} + }, Type::GroupType { basic_info: _, fields: _, @@ -126,26 +128,7 @@ fn choose_compression_scheme( Ok(props) } -fn can_apply_dict_encoding( - physical_type: &PhysicalType, - distinct_of_values: Option, - num_rows: usize, - col: &BlockEntry, -) -> Result { - const LOW_CARDINALITY_THRESHOLD: f64 = 10.0; - const AVG_BYTES_PER_VALUE: f64 = 10.0; - if !matches!(physical_type, PhysicalType::BYTE_ARRAY) { - return Ok(false); - } - let is_low_cardinality = distinct_of_values - .is_some_and(|ndv| num_rows as f64 / ndv as f64 > LOW_CARDINALITY_THRESHOLD); - let column = col.value.convert_to_full_column(&col.data_type, num_rows); - let memory_size = column.memory_size(); - let total_bytes = memory_size - num_rows * 8; - let avg_bytes_per_value = total_bytes as f64 / num_rows as f64; - Ok(is_low_cardinality && avg_bytes_per_value < AVG_BYTES_PER_VALUE) -} - +#[allow(dead_code)] fn can_apply_delta_binary_pack( physical_type: &PhysicalType, col: &BlockEntry, From 38d8bab188cf211c52f822ab870a1ffb6db36645 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 13 May 2024 10:43:01 +0800 Subject: [PATCH 09/11] refactor --- Cargo.lock | 3 + src/query/expression/src/converts/arrow/to.rs | 5 +- src/query/storages/common/blocks/Cargo.toml | 3 + .../common/blocks/src/codec/byte_array.rs | 90 ++++++ .../common/blocks/src/codec/choose.rs | 60 ++++ .../storages/common/blocks/src/codec/int.rs | 85 +++++ .../storages/common/blocks/src/codec/mod.rs | 19 ++ src/query/storages/common/blocks/src/lib.rs | 1 + .../storages/common/blocks/src/parquet_rs.rs | 302 +----------------- 9 files changed, 266 insertions(+), 302 deletions(-) create mode 100644 src/query/storages/common/blocks/src/codec/byte_array.rs create mode 100644 src/query/storages/common/blocks/src/codec/choose.rs create mode 100644 src/query/storages/common/blocks/src/codec/int.rs create mode 100644 src/query/storages/common/blocks/src/codec/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 98f2fb22d56b0..7cdb1b8514790 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5067,6 +5067,9 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-schema 50.0.0", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 47f2211d70cb6..279d44524bf56 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -170,8 +170,7 @@ impl DataBlock { impl Column { pub fn into_arrow_rs(self) -> Arc { let arrow2_array: Box = self.as_arrow(); - let arrow_array: Arc = arrow2_array.into(); - arrow_array + arrow2_array.into() } } @@ -200,7 +199,7 @@ fn arrow_field_from_arrow2_field(field: Arrow2Field) -> ArrowField { Arrow2DataType::Struct(f) => { ArrowDataType::Struct(f.into_iter().map(arrow_field_from_arrow2_field).collect()) } - other => other.into(), + other => ArrowDataType::from(other), }; ArrowField::new(field.name, data_type, field.is_nullable).with_metadata(metadata) diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index 6c8f5314b31ce..916bf4ff0c310 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -12,6 +12,9 @@ doctest = false test = true [dependencies] +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } databend-common-exception = { path = "../../../../common/exception" } databend-common-expression = { path = "../../../expression" } databend-storages-common-table-meta = { path = "../table_meta" } diff --git a/src/query/storages/common/blocks/src/codec/byte_array.rs b/src/query/storages/common/blocks/src/codec/byte_array.rs new file mode 100644 index 0000000000000..7002fb583f77b --- /dev/null +++ b/src/query/storages/common/blocks/src/codec/byte_array.rs @@ -0,0 +1,90 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_array::types::ByteArrayType; +use arrow_array::types::LargeBinaryType; +use arrow_array::types::LargeUtf8Type; +use arrow_array::Array; +use arrow_array::GenericByteArray; +use arrow_schema::DataType as ArrowDataType; +use databend_common_exception::Result; +use databend_storages_common_table_meta::meta::ColumnStatistics; +use parquet_rs::basic::Encoding; +use parquet_rs::file::properties::WriterPropertiesBuilder; +use parquet_rs::schema::types::ColumnPath; + +const ROWS_PER_DISTINCT_THRESHOLD: f64 = 10.0; +const SAMPLE_ROWS: usize = 1000; +const AVERAGE_PREFIX_LEN_THRESHOLD: f64 = 8.0; + +pub fn choose_byte_array_encoding( + mut props: WriterPropertiesBuilder, + stat: Option<&ColumnStatistics>, + array: Arc, + column_name: &str, +) -> Result { + if array.is_empty() { + return Ok(props); + } + let col_path = ColumnPath::new(vec![column_name.to_string()]); + let ndv = stat.as_ref().and_then(|s| s.distinct_of_values); + let num_rows = array.len(); + if let Some(ndv) = ndv { + if num_rows as f64 / ndv as f64 > ROWS_PER_DISTINCT_THRESHOLD { + props = props.set_column_dictionary_enabled(col_path, true); + return Ok(props); + } + } + let data_type = array.data_type(); + match data_type { + ArrowDataType::LargeBinary => { + if can_apply_delta_byte_array::(&array)? { + props = props.set_column_encoding(col_path, Encoding::DELTA_BYTE_ARRAY); + return Ok(props); + } + } + ArrowDataType::LargeUtf8 => { + if can_apply_delta_byte_array::(&array)? { + props = props.set_column_encoding(col_path, Encoding::DELTA_BYTE_ARRAY); + return Ok(props); + } + } + _ => {} + }; + props = props.set_column_encoding(col_path, Encoding::DELTA_LENGTH_BYTE_ARRAY); + Ok(props) +} + +fn can_apply_delta_byte_array(array: &dyn Array) -> Result { + let num_rows = array.len(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + let mut sum_prefix_len = 0; + for i in 1..num_rows.min(SAMPLE_ROWS) { + let last: &[u8] = array.value(i - 1).as_ref(); + let cur: &[u8] = array.value(i).as_ref(); + let prefix_len = last + .iter() + .zip(cur.iter()) + .take_while(|(a, b)| a == b) + .count(); + sum_prefix_len += prefix_len; + } + let avg_prefix_len = sum_prefix_len as f64 / num_rows as f64; + Ok(avg_prefix_len > AVERAGE_PREFIX_LEN_THRESHOLD) +} diff --git a/src/query/storages/common/blocks/src/codec/choose.rs b/src/query/storages/common/blocks/src/codec/choose.rs new file mode 100644 index 0000000000000..c6b0ba8b742c4 --- /dev/null +++ b/src/query/storages/common/blocks/src/codec/choose.rs @@ -0,0 +1,60 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::TableSchema; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; +use parquet_rs::basic::Type as PhysicalType; +use parquet_rs::file::properties::WriterPropertiesBuilder; +use parquet_rs::schema::types::Type; +use parquet_rs::schema::types::TypePtr; + +use super::byte_array::choose_byte_array_encoding; +use super::int::choose_int_encoding; + +pub fn choose_codec( + mut props: WriterPropertiesBuilder, + block: &DataBlock, + parquet_fields: &[TypePtr], + table_schema: &TableSchema, + stat: &StatisticsOfColumns, +) -> Result { + for ((parquet_field, table_field), entry) in parquet_fields + .iter() + .zip(table_schema.fields.iter()) + .zip(block.columns()) + { + let column = entry.to_column(block.num_rows()); + let array = column.into_arrow_rs(); + let stat = stat.get(&table_field.column_id); + let column_name = table_field.name.as_str(); + match parquet_field.as_ref() { + Type::PrimitiveType { physical_type, .. } => match physical_type { + PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + props = choose_byte_array_encoding(props, stat, array, column_name)?; + } + PhysicalType::INT32 | PhysicalType::INT64 | PhysicalType::INT96 => { + props = choose_int_encoding(props, stat, array, column_name)? + } + _ => {} + }, + Type::GroupType { + basic_info: _, + fields: _, + } => {} // TODO: handle nested fields + } + } + Ok(props) +} diff --git a/src/query/storages/common/blocks/src/codec/int.rs b/src/query/storages/common/blocks/src/codec/int.rs new file mode 100644 index 0000000000000..f405b5d0c7ee6 --- /dev/null +++ b/src/query/storages/common/blocks/src/codec/int.rs @@ -0,0 +1,85 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_array::types::ArrowPrimitiveType; +use arrow_array::types::Date32Type; +use arrow_array::types::Decimal128Type; +use arrow_array::types::Decimal256Type; +use arrow_array::types::Int32Type; +use arrow_array::types::Int64Type; +use arrow_array::types::TimestampMicrosecondType; +use arrow_array::types::UInt32Type; +use arrow_array::types::UInt64Type; +use arrow_array::Array; +use arrow_array::ArrowNativeTypeOp; +use arrow_array::PrimitiveArray; +use arrow_buffer::ArrowNativeType; +use arrow_schema::DataType as ArrowDataType; +use arrow_schema::TimeUnit; +use databend_common_exception::Result; +use databend_storages_common_table_meta::meta::ColumnStatistics; +use parquet_rs::basic::Encoding; +use parquet_rs::file::properties::WriterPropertiesBuilder; +use parquet_rs::schema::types::ColumnPath; + +const MAX_WIDTH_THRESHOLD: i64 = 3; + +pub fn choose_int_encoding( + mut props: WriterPropertiesBuilder, + _stat: Option<&ColumnStatistics>, + array: Arc, + column_name: &str, +) -> Result { + if array.is_empty() { + return Ok(props); + } + let col_path = ColumnPath::new(vec![column_name.to_string()]); + let data_type = array.data_type(); + let apply_delta = match data_type { + ArrowDataType::Int32 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::Int64 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::UInt32 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::UInt64 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => { + can_apply_delta_binary_pack::(&array)? + } + ArrowDataType::Date32 => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::Decimal128(_, _) => can_apply_delta_binary_pack::(&array)?, + ArrowDataType::Decimal256(_, _) => can_apply_delta_binary_pack::(&array)?, + _ => false, + }; + if apply_delta { + props = props.set_column_encoding(col_path, Encoding::DELTA_BINARY_PACKED); + } + Ok(props) +} + +fn can_apply_delta_binary_pack(array: &dyn Array) -> Result { + let mut max_delta = T::Native::MIN_TOTAL_ORDER; + let mut min_delta = T::Native::MAX_TOTAL_ORDER; + let array = array.as_any().downcast_ref::>().unwrap(); + for i in 1..array.len() { + let delta = array.value(i).sub_wrapping(array.value(i - 1)); + if delta.is_gt(max_delta) { + max_delta = delta; + } + if delta.is_lt(min_delta) { + min_delta = delta; + } + } + let x = max_delta.sub_wrapping(min_delta).as_usize(); + Ok(x <= (1 << MAX_WIDTH_THRESHOLD)) +} diff --git a/src/query/storages/common/blocks/src/codec/mod.rs b/src/query/storages/common/blocks/src/codec/mod.rs new file mode 100644 index 0000000000000..11508ef2490a0 --- /dev/null +++ b/src/query/storages/common/blocks/src/codec/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod byte_array; +mod choose; +mod int; + +pub use choose::choose_codec; diff --git a/src/query/storages/common/blocks/src/lib.rs b/src/query/storages/common/blocks/src/lib.rs index 722d2d20c6d89..b80b30b552de8 100644 --- a/src/query/storages/common/blocks/src/lib.rs +++ b/src/query/storages/common/blocks/src/lib.rs @@ -14,5 +14,6 @@ #![allow(clippy::uninlined_format_args)] +mod codec; mod parquet_rs; pub use parquet_rs::blocks_to_parquet; diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index e9203a16f1dfd..13faab7e8b8e0 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -14,29 +14,20 @@ use std::sync::Arc; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::converts::arrow::table_schema_to_arrow_schema_ignore_inside_nullable; -use databend_common_expression::types::DataType; -use databend_common_expression::types::DecimalDataType; -use databend_common_expression::types::NumberDataType; -use databend_common_expression::BlockEntry; use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; -use ethnum::I256; use parquet_rs::arrow::arrow_to_parquet_schema; use parquet_rs::arrow::ArrowWriter; use parquet_rs::basic::Encoding; -use parquet_rs::basic::Type as PhysicalType; use parquet_rs::file::properties::EnabledStatistics; use parquet_rs::file::properties::WriterProperties; -use parquet_rs::file::properties::WriterPropertiesBuilder; use parquet_rs::format::FileMetaData; -use parquet_rs::schema::types::ColumnPath; -use parquet_rs::schema::types::Type; -use parquet_rs::schema::types::TypePtr; + +use crate::codec::choose_codec; /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( @@ -63,7 +54,7 @@ pub fn blocks_to_parquet( let parquet_schema = arrow_to_parquet_schema(&arrow_schema)?; if blocks.len() == 1 { // doesn't not cover the case of multiple blocks for now. to simplify the implementation - props_builder = choose_compression_scheme( + props_builder = choose_codec( props_builder, &blocks[0], parquet_schema.root_schema().get_fields(), @@ -84,290 +75,3 @@ pub fn blocks_to_parquet( let file_meta = writer.close()?; Ok(file_meta) } - -fn choose_compression_scheme( - mut props: WriterPropertiesBuilder, - block: &DataBlock, - parquet_fields: &[TypePtr], - table_schema: &TableSchema, - stat: &StatisticsOfColumns, -) -> Result { - for ((parquet_field, table_field), _col) in parquet_fields - .iter() - .zip(table_schema.fields.iter()) - .zip(block.columns()) - { - match parquet_field.as_ref() { - Type::PrimitiveType { - basic_info: _, - physical_type, - type_length: _, - scale: _, - precision: _, - } => match physical_type { - PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - let ndv = stat - .get(&table_field.column_id) - .and_then(|stat| stat.distinct_of_values); - let num_rows = block.num_rows(); - if let Some(ndv) = ndv { - if num_rows as f64 / ndv as f64 > 10.0 { - let col_path = ColumnPath::new(vec![table_field.name().clone()]); - props = props.set_column_dictionary_enabled(col_path, true); - } - } - } - _ => {} - }, - Type::GroupType { - basic_info: _, - fields: _, - } => {} // TODO: handle nested fields - } - } - Ok(props) -} - -#[allow(dead_code)] -fn can_apply_delta_binary_pack( - physical_type: &PhysicalType, - col: &BlockEntry, - num_rows: usize, -) -> Result { - const MAX_DELTA: i64 = 1 << 3; - if !matches!( - physical_type, - PhysicalType::INT32 | PhysicalType::INT64 | PhysicalType::INT96 - ) { - return Ok(false); - } - if num_rows == 0 { - return Ok(false); - } - let col = col - .value - .convert_to_full_column(&col.data_type, num_rows) - .remove_nullable(); - match col.data_type().remove_nullable() { - DataType::Number(NumberDataType::UInt8) => { - let mut max_delta = 0; - let mut min_delta = u8::MAX; - let col = col.as_number().unwrap().as_u_int8().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) as i64 <= MAX_DELTA) - } - DataType::Number(NumberDataType::UInt16) => { - let mut max_delta = 0; - let mut min_delta = u16::MAX; - let col = col.as_number().unwrap().as_u_int16().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) as i64 <= MAX_DELTA) - } - DataType::Number(NumberDataType::UInt32) => { - let mut max_delta = 0; - let mut min_delta = u32::MAX; - let col = col.as_number().unwrap().as_u_int32().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) as i64 <= MAX_DELTA) - } - DataType::Number(NumberDataType::UInt64) => { - let mut max_delta = 0; - let mut min_delta = u64::MAX; - let col = col.as_number().unwrap().as_u_int64().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) <= MAX_DELTA as u64) - } - DataType::Number(NumberDataType::Int8) => { - let mut max_delta = 0; - let mut min_delta = i8::MAX; - let col = col.as_number().unwrap().as_int8().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) as i64 <= MAX_DELTA) - } - DataType::Number(NumberDataType::Int16) => { - let mut max_delta = 0; - let mut min_delta = i16::MAX; - let col = col.as_number().unwrap().as_int16().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) as i64 <= MAX_DELTA) - } - DataType::Number(NumberDataType::Int32) => { - let mut max_delta = 0; - let mut min_delta = i32::MAX; - let col = col.as_number().unwrap().as_int32().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) as i64 <= MAX_DELTA) - } - DataType::Number(NumberDataType::Int64) => { - let mut max_delta = 0; - let mut min_delta = i64::MAX; - let col = col.as_number().unwrap().as_int64().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) <= MAX_DELTA) - } - DataType::Decimal(DecimalDataType::Decimal128(_)) => { - let mut max_delta = 0; - let mut min_delta = i128::MAX; - let (col, _) = col.as_decimal().unwrap().as_decimal128().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) <= MAX_DELTA as i128) - } - DataType::Decimal(DecimalDataType::Decimal256(_)) => { - let mut max_delta: I256 = I256::ZERO; - let mut min_delta = I256::MAX; - let (col, _) = col.as_decimal().unwrap().as_decimal256().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok(max_delta - min_delta <= I256::from(MAX_DELTA)) - } - DataType::Timestamp => { - let mut max_delta = 0; - let mut min_delta = i64::MAX; - let col = col.as_timestamp().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) <= MAX_DELTA) - } - DataType::Date => { - let mut max_delta = 0; - let mut min_delta = i32::MAX; - let col = col.as_date().unwrap(); - let mut col_iter = col.iter(); - let mut prev = *col_iter.next().unwrap(); - for &v in col_iter { - let delta = if v > prev { v - prev } else { prev - v }; - if delta > max_delta { - max_delta = delta; - } - if delta < min_delta { - min_delta = delta; - } - prev = v; - } - Ok((max_delta - min_delta) as i64 <= MAX_DELTA) - } - _ => Err(ErrorCode::Internal(format!( - "Unsupported data type for delta binary pack: {:?}", - col.data_type() - ))), - } -} From 7d3f44c3d8336b3f54edd85d56d4de5f6733517c Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 13 May 2024 11:08:55 +0800 Subject: [PATCH 10/11] remove unused dep --- Cargo.lock | 1 - src/query/storages/common/blocks/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7cdb1b8514790..f724d2820a0fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5073,7 +5073,6 @@ dependencies = [ "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", - "ethnum 1.5.0 (git+https://github.com/ariesdevil/ethnum-rs?rev=4cb05f1)", "parquet", ] diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index 916bf4ff0c310..ac9cc4763e221 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -18,7 +18,6 @@ arrow-schema = { workspace = true } databend-common-exception = { path = "../../../../common/exception" } databend-common-expression = { path = "../../../expression" } databend-storages-common-table-meta = { path = "../table_meta" } -ethnum = { workspace = true } parquet_rs = { workspace = true } [build-dependencies] From 12a71f348223eb7760847bcf6008470921b478f1 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 13 May 2024 12:17:31 +0800 Subject: [PATCH 11/11] fix panic --- Cargo.lock | 6 +++--- src/query/storages/common/blocks/src/codec/choose.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8685014367d0..2a3334d7f5c6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5088,9 +5088,9 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ - "arrow-array 50.0.0", - "arrow-buffer 50.0.0", - "arrow-schema 50.0.0", + "arrow-array 51.0.0", + "arrow-buffer 51.0.0", + "arrow-schema 51.0.0", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", diff --git a/src/query/storages/common/blocks/src/codec/choose.rs b/src/query/storages/common/blocks/src/codec/choose.rs index c6b0ba8b742c4..a1a031fcf508d 100644 --- a/src/query/storages/common/blocks/src/codec/choose.rs +++ b/src/query/storages/common/blocks/src/codec/choose.rs @@ -42,10 +42,10 @@ pub fn choose_codec( let column_name = table_field.name.as_str(); match parquet_field.as_ref() { Type::PrimitiveType { physical_type, .. } => match physical_type { - PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + PhysicalType::BYTE_ARRAY => { props = choose_byte_array_encoding(props, stat, array, column_name)?; } - PhysicalType::INT32 | PhysicalType::INT64 | PhysicalType::INT96 => { + PhysicalType::INT32 | PhysicalType::INT64 => { props = choose_int_encoding(props, stat, array, column_name)? } _ => {}