Skip to content

Commit 29ccc92

Browse files
committed
feat: new table option FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING
When this option is set to true, fuse table with parquet storage format will enable encoding and dictionary during serialization. The defult encoding policy of paquet-rs will be used.
1 parent b32ab7c commit 29ccc92

File tree

12 files changed

+102
-9
lines changed

12 files changed

+102
-9
lines changed

src/query/formats/src/output_format/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ impl OutputFormat for ParquetOutputFormat {
5353
return Ok(vec![]);
5454
}
5555
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
56-
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?;
56+
// Unloading data, enable encoding unconditionally in this case, since ...
57+
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd, true)?;
5758
Ok(buf)
5859
}
5960
}

src/query/service/src/interpreters/common/table_option_validation.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
3030
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP;
3131
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
3232
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM;
33+
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING;
3334
use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE;
3435
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
3536
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
@@ -83,6 +84,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
8384
r.insert(OPT_KEY_TEMP_PREFIX);
8485
r.insert(OPT_KEY_SEGMENT_FORMAT);
8586
r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH);
87+
r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING);
8688
r
8789
});
8890

@@ -243,3 +245,19 @@ where
243245
}
244246
Ok(())
245247
}
248+
249+
pub fn is_valid_fuse_parquet_encoding_opt(
250+
options: &BTreeMap<String, String>,
251+
) -> databend_common_exception::Result<()> {
252+
is_valid_bool_opt(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, options)
253+
}
254+
255+
fn is_valid_bool_opt(
256+
key: &str,
257+
options: &BTreeMap<String, String>,
258+
) -> databend_common_exception::Result<()> {
259+
if let Some(value) = options.get(key) {
260+
value.parse::<bool>()?;
261+
}
262+
Ok(())
263+
}

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c
7070
use crate::interpreters::common::table_option_validation::is_valid_change_tracking;
7171
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
7272
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
73+
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt;
7374
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
7475
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
7576
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
@@ -459,6 +460,8 @@ impl CreateTableInterpreter {
459460
is_valid_random_seed(&table_meta.options)?;
460461
// check table level data_retention_period_in_hours
461462
is_valid_data_retention_period(&table_meta.options)?;
463+
// check enable_parquet_encoding
464+
is_valid_fuse_parquet_encoding_opt(&table_meta.options)?;
462465

463466
// Same as settings of FUSE_OPT_KEY_ENABLE_AUTO_VACUUM, expect value type is unsigned integer
464467
is_valid_option_of_type::<u32>(&table_meta.options, FUSE_OPT_KEY_ENABLE_AUTO_VACUUM)?;

src/query/service/src/interpreters/interpreter_table_set_options.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg
5353
use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns;
5454
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
5555
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
56+
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt;
5657
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
5758
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
5859
use crate::interpreters::Interpreter;
@@ -90,6 +91,8 @@ impl Interpreter for SetOptionsInterpreter {
9091
is_valid_row_per_block(&self.plan.set_options)?;
9192
// check data_retention_period
9293
is_valid_data_retention_period(&self.plan.set_options)?;
94+
// check enable_parquet_encoding
95+
is_valid_fuse_parquet_encoding_opt(&self.plan.set_options)?;
9396

9497
// check storage_format
9598
let error_str = "invalid opt for fuse table in alter table statement";

src/query/service/src/test_kits/block_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl<'a> BlockWriter<'a> {
144144
vec![index_block],
145145
&mut data,
146146
TableCompression::None,
147+
false,
147148
)?;
148149
let size = data.len() as u64;
149150
data_accessor.write(&location.0, data).await?;

src/query/storages/common/blocks/src/parquet_rs.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use parquet::arrow::ArrowWriter;
2222
use parquet::basic::Encoding;
2323
use parquet::file::properties::EnabledStatistics;
2424
use parquet::file::properties::WriterProperties;
25+
use parquet::file::properties::WriterVersion;
2526
use parquet::format::FileMetaData;
2627

2728
/// Serialize data blocks to parquet format.
@@ -30,17 +31,41 @@ pub fn blocks_to_parquet(
3031
blocks: Vec<DataBlock>,
3132
write_buffer: &mut Vec<u8>,
3233
compression: TableCompression,
34+
enable_encoding: bool,
3335
) -> Result<FileMetaData> {
3436
assert!(!blocks.is_empty());
35-
let props = WriterProperties::builder()
37+
let builder = WriterProperties::builder()
3638
.set_compression(compression.into())
3739
// use `usize::MAX` to effectively limit the number of row groups to 1
3840
.set_max_row_group_size(usize::MAX)
39-
.set_encoding(Encoding::PLAIN)
40-
.set_dictionary_enabled(false)
4141
.set_statistics_enabled(EnabledStatistics::None)
42-
.set_bloom_filter_enabled(false)
43-
.build();
42+
.set_bloom_filter_enabled(false);
43+
44+
let builder = if enable_encoding {
45+
// Enable dictionary encoding and fallback encodings.
46+
//
47+
// Memo for quick lookup:
48+
// The fallback encoding "strategy" used by parquet-54.2.1 is:
49+
//
50+
// ~~~
51+
// (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
52+
// (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
53+
// (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
54+
// (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
55+
// (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
56+
// _ => Encoding::PLAIN,
57+
// ~~~
58+
//
59+
builder
60+
.set_writer_version(WriterVersion::PARQUET_2_0)
61+
.set_dictionary_enabled(true)
62+
} else {
63+
builder
64+
.set_dictionary_enabled(false)
65+
.set_encoding(Encoding::PLAIN)
66+
};
67+
68+
let props = builder.build();
4469
let batches = blocks
4570
.into_iter()
4671
.map(|block| block.to_record_batch(table_schema))

src/query/storages/fuse/src/constants.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block";
1818
pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page";
1919
pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold";
2020
pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size";
21-
2221
pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours";
2322
pub const FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP: &str =
2423
"data_retention_num_snapshots_to_keep";
2524
pub const FUSE_OPT_KEY_ENABLE_AUTO_VACUUM: &str = "enable_auto_vacuum";
2625
pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids";
26+
pub const FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING: &str = "enable_parquet_encoding";
2727

2828
pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
2929
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
126126
use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
127127
use crate::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP;
128128
use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
129+
use crate::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING;
129130
use crate::FUSE_OPT_KEY_FILE_SIZE;
130131
use crate::FUSE_OPT_KEY_ROW_PER_BLOCK;
131132
use crate::FUSE_OPT_KEY_ROW_PER_PAGE;
@@ -277,11 +278,14 @@ impl FuseTable {
277278
let block_per_seg =
278279
self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
279280

281+
let enable_parquet_encoding = self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, false);
282+
280283
WriteSettings {
281284
storage_format: self.storage_format,
282285
table_compression: self.table_compression,
283286
max_page_size,
284287
block_per_seg,
288+
enable_parquet_encoding,
285289
}
286290
}
287291

src/query/storages/fuse/src/io/write/block_writer.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,13 @@ pub fn serialize_block(
6969
let schema = Arc::new(schema.remove_virtual_computed_fields());
7070
match write_settings.storage_format {
7171
FuseStorageFormat::Parquet => {
72-
let result =
73-
blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?;
72+
let result = blocks_to_parquet(
73+
&schema,
74+
vec![block],
75+
buf,
76+
write_settings.table_compression,
77+
write_settings.enable_parquet_encoding,
78+
)?;
7479
let meta = column_parquet_metas(&result, &schema)?;
7580
Ok(meta)
7681
}

src/query/storages/fuse/src/io/write/write_settings.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub struct WriteSettings {
2828
pub max_page_size: usize,
2929

3030
pub block_per_seg: usize,
31+
pub enable_parquet_encoding: bool,
3132
}
3233

3334
impl Default for WriteSettings {
@@ -37,6 +38,7 @@ impl Default for WriteSettings {
3738
table_compression: TableCompression::default(),
3839
max_page_size: DEFAULT_ROW_PER_PAGE,
3940
block_per_seg: DEFAULT_BLOCK_PER_SEGMENT,
41+
enable_parquet_encoding: false,
4042
}
4143
}
4244
}

0 commit comments

Comments
 (0)