diff --git a/src/query/formats/src/output_format/parquet.rs b/src/query/formats/src/output_format/parquet.rs index 589a76250edba..86398eb3709c1 100644 --- a/src/query/formats/src/output_format/parquet.rs +++ b/src/query/formats/src/output_format/parquet.rs @@ -53,7 +53,8 @@ impl OutputFormat for ParquetOutputFormat { return Ok(vec![]); } let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?; + // Unloading data, enable encoding unconditionally in this case, since ... + let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd, true)?; Ok(buf) } } diff --git a/src/query/service/src/interpreters/common/table_option_validation.rs b/src/query/service/src/interpreters/common/table_option_validation.rs index ab332a1dce642..b556fe308dde0 100644 --- a/src/query/service/src/interpreters/common/table_option_validation.rs +++ b/src/query/service/src/interpreters/common/table_option_validation.rs @@ -30,6 +30,7 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP; use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS; use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM; +use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING; use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE; use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; @@ -83,6 +84,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock> = LazyLock::new( r.insert(OPT_KEY_TEMP_PREFIX); r.insert(OPT_KEY_SEGMENT_FORMAT); r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH); + r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING); r }); @@ -243,3 +245,19 @@ where } Ok(()) } + +pub fn is_valid_fuse_parquet_encoding_opt( + options: &BTreeMap, +) -> databend_common_exception::Result<()> { + is_valid_bool_opt(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, options) +} + +fn is_valid_bool_opt( + key: &str, + options: &BTreeMap, +) -> databend_common_exception::Result<()> { + if let Some(value) = options.get(key) { + value.parse::()?; + } + Ok(()) +} diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index 9e45aaee345c4..2eeb85d63df39 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -70,6 +70,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c use crate::interpreters::common::table_option_validation::is_valid_change_tracking; use crate::interpreters::common::table_option_validation::is_valid_create_opt; use crate::interpreters::common::table_option_validation::is_valid_data_retention_period; +use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt; use crate::interpreters::common::table_option_validation::is_valid_option_of_type; use crate::interpreters::common::table_option_validation::is_valid_random_seed; use crate::interpreters::common::table_option_validation::is_valid_row_per_block; @@ -459,6 +460,8 @@ impl CreateTableInterpreter { is_valid_random_seed(&table_meta.options)?; // check table level data_retention_period_in_hours is_valid_data_retention_period(&table_meta.options)?; + // check enable_parquet_encoding + is_valid_fuse_parquet_encoding_opt(&table_meta.options)?; // Same as settings of FUSE_OPT_KEY_ENABLE_AUTO_VACUUM, expect value type is unsigned integer is_valid_option_of_type::(&table_meta.options, FUSE_OPT_KEY_ENABLE_AUTO_VACUUM)?; diff --git a/src/query/service/src/interpreters/interpreter_table_set_options.rs b/src/query/service/src/interpreters/interpreter_table_set_options.rs index d75f94772cddf..5429801880cf5 100644 --- a/src/query/service/src/interpreters/interpreter_table_set_options.rs +++ b/src/query/service/src/interpreters/interpreter_table_set_options.rs @@ -53,6 +53,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns; use crate::interpreters::common::table_option_validation::is_valid_create_opt; use crate::interpreters::common::table_option_validation::is_valid_data_retention_period; +use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt; use crate::interpreters::common::table_option_validation::is_valid_option_of_type; use crate::interpreters::common::table_option_validation::is_valid_row_per_block; use crate::interpreters::Interpreter; @@ -90,6 +91,8 @@ impl Interpreter for SetOptionsInterpreter { is_valid_row_per_block(&self.plan.set_options)?; // check data_retention_period is_valid_data_retention_period(&self.plan.set_options)?; + // check enable_parquet_encoding + is_valid_fuse_parquet_encoding_opt(&self.plan.set_options)?; // check storage_format let error_str = "invalid opt for fuse table in alter table statement"; diff --git a/src/query/service/src/test_kits/block_writer.rs b/src/query/service/src/test_kits/block_writer.rs index 7bd9b7bed0729..b11d069088bf6 100644 --- a/src/query/service/src/test_kits/block_writer.rs +++ b/src/query/service/src/test_kits/block_writer.rs @@ -144,6 +144,7 @@ impl<'a> BlockWriter<'a> { vec![index_block], &mut data, TableCompression::None, + false, )?; let size = data.len() as u64; data_accessor.write(&location.0, data).await?; diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 53c32579a2ea8..3c6524487b41e 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -22,6 +22,7 @@ use parquet::arrow::ArrowWriter; use parquet::basic::Encoding; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; +use parquet::file::properties::WriterVersion; use parquet::format::FileMetaData; /// Serialize data blocks to parquet format. @@ -30,17 +31,41 @@ pub fn blocks_to_parquet( blocks: Vec, write_buffer: &mut Vec, compression: TableCompression, + enable_encoding: bool, ) -> Result { assert!(!blocks.is_empty()); - let props = WriterProperties::builder() + let builder = WriterProperties::builder() .set_compression(compression.into()) // use `usize::MAX` to effectively limit the number of row groups to 1 .set_max_row_group_size(usize::MAX) - .set_encoding(Encoding::PLAIN) - .set_dictionary_enabled(false) .set_statistics_enabled(EnabledStatistics::None) - .set_bloom_filter_enabled(false) - .build(); + .set_bloom_filter_enabled(false); + + let builder = if enable_encoding { + // Enable dictionary encoding and fallback encodings. + // + // Memo for quick lookup: + // The fallback encoding "strategy" used by parquet-54.2.1 is: + // + // ~~~ + // (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE, + // (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, + // (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, + // (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY, + // (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY, + // _ => Encoding::PLAIN, + // ~~~ + // + builder + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_dictionary_enabled(true) + } else { + builder + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + }; + + let props = builder.build(); let batches = blocks .into_iter() .map(|block| block.to_record_batch(table_schema)) diff --git a/src/query/storages/fuse/benches/bench.rs b/src/query/storages/fuse/benches/bench.rs index e94071f74ae17..d45511e84e32c 100644 --- a/src/query/storages/fuse/benches/bench.rs +++ b/src/query/storages/fuse/benches/bench.rs @@ -141,11 +141,13 @@ mod dummy { let max_page_size = 8192; let block_per_seg = 1000; + let enable_parquet_encoding = false; let write_settings = WriteSettings { storage_format, table_compression: compression, max_page_size, block_per_seg, + enable_parquet_encoding, }; let schema = Arc::new(schema); let mut buffer = Vec::new(); diff --git a/src/query/storages/fuse/src/constants.rs b/src/query/storages/fuse/src/constants.rs index 890153cb5f15b..00dc8ce8704be 100644 --- a/src/query/storages/fuse/src/constants.rs +++ b/src/query/storages/fuse/src/constants.rs @@ -18,12 +18,12 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block"; pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page"; pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold"; pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size"; - pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours"; pub const FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP: &str = "data_retention_num_snapshots_to_keep"; pub const FUSE_OPT_KEY_ENABLE_AUTO_VACUUM: &str = "enable_auto_vacuum"; pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids"; +pub const FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING: &str = "enable_parquet_encoding"; pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b"; pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i"; diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 1182dbdc372da..be76ce30c5d84 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -126,6 +126,7 @@ use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD; use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; use crate::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP; use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS; +use crate::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING; use crate::FUSE_OPT_KEY_FILE_SIZE; use crate::FUSE_OPT_KEY_ROW_PER_BLOCK; use crate::FUSE_OPT_KEY_ROW_PER_PAGE; @@ -277,11 +278,14 @@ impl FuseTable { let block_per_seg = self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + let enable_parquet_encoding = self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, false); + WriteSettings { storage_format: self.storage_format, table_compression: self.table_compression, max_page_size, block_per_seg, + enable_parquet_encoding, } } diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 32900fa6f8e86..a0b5468af909a 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -69,8 +69,13 @@ pub fn serialize_block( let schema = Arc::new(schema.remove_virtual_computed_fields()); match write_settings.storage_format { FuseStorageFormat::Parquet => { - let result = - blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?; + let result = blocks_to_parquet( + &schema, + vec![block], + buf, + write_settings.table_compression, + write_settings.enable_parquet_encoding, + )?; let meta = column_parquet_metas(&result, &schema)?; Ok(meta) } diff --git a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs index 1b14fa0df99ca..1c38f0bbd0402 100644 --- a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs @@ -76,6 +76,7 @@ impl BloomIndexState { vec![index_block], &mut data, TableCompression::None, + false, )?; let data_size = data.len() as u64; Ok(Self { diff --git a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs index 7ccbadc328e50..7314ffbbea6c1 100644 --- a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs +++ b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs @@ -288,6 +288,7 @@ impl VirtualColumnBuilder { vec![virtual_block], &mut data, write_settings.table_compression, + false, )?; let draft_virtual_column_metas = self.file_meta_to_virtual_column_metas( diff --git a/src/query/storages/fuse/src/io/write/write_settings.rs b/src/query/storages/fuse/src/io/write/write_settings.rs index 763942c018916..9425056066698 100644 --- a/src/query/storages/fuse/src/io/write/write_settings.rs +++ b/src/query/storages/fuse/src/io/write/write_settings.rs @@ -28,6 +28,7 @@ pub struct WriteSettings { pub max_page_size: usize, pub block_per_seg: usize, + pub enable_parquet_encoding: bool, } impl Default for WriteSettings { @@ -37,6 +38,7 @@ impl Default for WriteSettings { table_compression: TableCompression::default(), max_page_size: DEFAULT_ROW_PER_PAGE, block_per_seg: DEFAULT_BLOCK_PER_SEGMENT, + enable_parquet_encoding: false, } } } diff --git a/src/query/storages/result_cache/src/write/writer.rs b/src/query/storages/result_cache/src/write/writer.rs index 0ba10167d2762..a2299b2832daa 100644 --- a/src/query/storages/result_cache/src/write/writer.rs +++ b/src/query/storages/result_cache/src/write/writer.rs @@ -72,11 +72,13 @@ impl ResultCacheWriter { #[async_backtrace::framed] pub async fn write_to_storage(&self) -> Result { let mut buf = Vec::with_capacity(self.current_bytes); + // TODO doc why encoding is not enabled let _ = blocks_to_parquet( &self.schema, self.blocks.clone(), &mut buf, TableCompression::None, + false, )?; let file_location = format!("{}/{}.parquet", self.location, Uuid::new_v4().as_simple()); diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test new file mode 100644 index 0000000000000..e80ae5b6c1a62 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test @@ -0,0 +1,124 @@ +statement ok +create or replace database test_tbl_opt_parquet_encoding; + +statement ok +use test_tbl_opt_parquet_encoding; + +############################################# +# Create table with parquet encoding option # +############################################# + +statement ok +create or replace table t_encoded (c int, s string) enable_parquet_encoding = 'true' compression = 'lz4'; + +statement ok +create or replace table t(c int, s string) compression = 'lz4'; + +statement ok +insert into t_encoded(c, s) select number as c, to_string(number) as s from numbers(1000000); + +statement ok +optimize table t_encoded compact; + +statement ok +insert into t(c, s) select number as c, to_string(number) as s from numbers(1000000); + +statement ok +optimize table t compact; + +# In this case, lz4 with encoding produces smaller block files +query T +with + e as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 't_encoded') limit 1), + p as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 't') limit 1) + select e.c < p.c from e, p +---- +1 + + +################################ +# Alter table parquet encoding # +################################ + + +# 1. prepare plain encoded data and keep the file size +statement ok +create or replace table tbl (c int, s string) compression = 'lz4'; + +statement ok +insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000); + +# insertion might be executed in a distributed manner, in this case, data blocks might be fragmented +statement ok +optimize table tbl compact; + +statement ok +create temp table tbl_size(s uint64); + +statement ok +insert into tbl_size select bytes_compressed from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1; + + +# 2. truncate table data and insert the same data with parquet encoding enabled +statement ok +truncate table tbl; + +statement ok +ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'true'); + +statement ok +insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000); + +# insertion might be executed in a distributed manner, in this case, data blocks might be fragmented, let's compact them +statement ok +optimize table tbl compact; + + +# 3. check that file size of newly created blocks with encoding is smaller + +query T +with + e as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1), + p as (select s as c from tbl_size) + select e.c < p.c from e,p +---- +1 + +# keep the size, will be used later +statement ok +create temp table e_tbl_size(s uint64); + +statement ok +insert into e_tbl_size select bytes_compressed from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1; + +# 4. check that table option `enable_parquet_encoding` could be turned off + +statement ok +truncate table tbl; + +statement ok +ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'false'); + +statement ok +insert into tbl(c, s) select number as c, to_string(number) as s from numbers(1000000); + +statement ok +optimize table tbl compact; + + +# 3. check that file size of newly created blocks with encoding is smaller +query T +with + p as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1), + e as (select s as c from e_tbl_size) + select e.c < p.c from e,p +---- +1 + + +# Test invalid option value + +statement error 1001 +ALTER TABLE tbl SET OPTIONS (enable_parquet_encoding = 'invalid'); + +