Skip to content

Commit d2d45b1

Browse files
authored
feat: new table option enable_auto_vacuum (#17884)
* feat: new table option `enable_auto_vacuum` * cleanup
1 parent 3047dc1 commit d2d45b1

File tree

6 files changed

+136
-8
lines changed

6 files changed

+136
-8
lines changed

src/query/service/src/interpreters/common/table_option_validation.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::any::type_name;
1516
use std::collections::BTreeMap;
1617
use std::collections::HashSet;
18+
use std::str::FromStr;
1719
use std::sync::LazyLock;
1820

1921
use chrono::Duration;
@@ -27,6 +29,7 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
2729
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
2830
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP;
2931
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
32+
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM;
3033
use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE;
3134
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
3235
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
@@ -62,6 +65,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
6265
r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD);
6366
r.insert(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS);
6467
r.insert(FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP);
68+
r.insert(FUSE_OPT_KEY_ENABLE_AUTO_VACUUM);
6569

6670
r.insert(OPT_KEY_BLOOM_INDEX_COLUMNS);
6771
r.insert(OPT_KEY_TABLE_COMPRESSION);
@@ -212,17 +216,30 @@ pub fn is_valid_bloom_index_columns(
212216
pub fn is_valid_change_tracking(
213217
options: &BTreeMap<String, String>,
214218
) -> databend_common_exception::Result<()> {
215-
if let Some(value) = options.get(OPT_KEY_CHANGE_TRACKING) {
216-
value.to_lowercase().parse::<bool>()?;
217-
}
218-
Ok(())
219+
is_valid_option_of_type::<bool>(options, OPT_KEY_CHANGE_TRACKING)
219220
}
220221

221222
pub fn is_valid_random_seed(
222223
options: &BTreeMap<String, String>,
223224
) -> databend_common_exception::Result<()> {
224-
if let Some(value) = options.get(OPT_KEY_RANDOM_SEED) {
225-
value.parse::<u64>()?;
225+
is_valid_option_of_type::<u64>(options, OPT_KEY_RANDOM_SEED)
226+
}
227+
228+
pub fn is_valid_option_of_type<T: FromStr>(
229+
options: &BTreeMap<String, String>,
230+
option_name: &str,
231+
) -> databend_common_exception::Result<()>
232+
where
233+
<T as FromStr>::Err: std::fmt::Display,
234+
{
235+
if let Some(value) = options.get(option_name) {
236+
value.parse::<T>().map_err(|e| {
237+
let msg = format!(
238+
"Failed to parse value [{value}] for table option '{option_name}' as type {}: {e}",
239+
type_name::<T>(),
240+
);
241+
ErrorCode::TableOptionInvalid(msg)
242+
})?;
226243
}
227244
Ok(())
228245
}

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use databend_common_sql::DefaultExprBinder;
4848
use databend_common_storages_fuse::io::MetaReaders;
4949
use databend_common_storages_fuse::FuseSegmentFormat;
5050
use databend_common_storages_fuse::FuseStorageFormat;
51+
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM;
5152
use databend_common_users::RoleCacheManager;
5253
use databend_common_users::UserApiProvider;
5354
use databend_enterprise_attach_table::get_attach_table_handler;
@@ -69,6 +70,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c
6970
use crate::interpreters::common::table_option_validation::is_valid_change_tracking;
7071
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
7172
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
73+
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
7274
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
7375
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
7476
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
@@ -458,6 +460,9 @@ impl CreateTableInterpreter {
458460
// check table level data_retention_period_in_hours
459461
is_valid_data_retention_period(&table_meta.options)?;
460462

463+
// Same as settings of FUSE_OPT_KEY_ENABLE_AUTO_VACUUM, expect value type is unsigned integer
464+
is_valid_option_of_type::<u32>(&table_meta.options, FUSE_OPT_KEY_ENABLE_AUTO_VACUUM)?;
465+
461466
for table_option in table_meta.options.iter() {
462467
let key = table_option.0.to_lowercase();
463468
if !is_valid_create_opt(&key, &self.plan.engine) {

src/query/service/src/interpreters/interpreter_table_set_options.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_storages_fuse::segment_format_from_location;
3232
use databend_common_storages_fuse::FuseSegmentFormat;
3333
use databend_common_storages_fuse::FuseTable;
3434
use databend_common_storages_fuse::TableContext;
35+
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM;
3536
use databend_storages_common_table_meta::meta::column_oriented_segment::AbstractSegment;
3637
use databend_storages_common_table_meta::meta::column_oriented_segment::ColumnOrientedSegmentBuilder;
3738
use databend_storages_common_table_meta::meta::column_oriented_segment::SegmentBuilder;
@@ -52,6 +53,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg
5253
use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns;
5354
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
5455
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
56+
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
5557
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
5658
use crate::interpreters::Interpreter;
5759
use crate::pipelines::PipelineBuildResult;
@@ -120,6 +122,10 @@ impl Interpreter for SetOptionsInterpreter {
120122
OPT_KEY_CLUSTER_TYPE
121123
)));
122124
}
125+
126+
// Same as settings of FUSE_OPT_KEY_ENABLE_AUTO_VACUUM, expect value type is unsigned integer
127+
is_valid_option_of_type::<u32>(&self.plan.set_options, FUSE_OPT_KEY_ENABLE_AUTO_VACUUM)?;
128+
123129
let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?;
124130
let database = self.plan.database.as_str();
125131
let table_name = self.plan.table.as_str();

src/query/storages/fuse/src/constants.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size";
2222
pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours";
2323
pub const FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP: &str =
2424
"data_retention_num_snapshots_to_keep";
25-
25+
pub const FUSE_OPT_KEY_ENABLE_AUTO_VACUUM: &str = "enable_auto_vacuum";
2626
pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids";
2727

2828
pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use crate::operations::SnapshotGenerator;
5959
use crate::operations::TransformMergeCommitMeta;
6060
use crate::operations::TruncateGenerator;
6161
use crate::FuseTable;
62+
use crate::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM;
6263
enum State {
6364
None,
6465
FillDefault,
@@ -177,14 +178,31 @@ where F: SnapshotGenerator + Send + Sync + 'static
177178
) -> Result<Option<PurgeMode>> {
178179
let mode = if Self::need_to_purge_all_history(table, snapshot_gen) {
179180
Some(PurgeMode::PurgeAllHistory)
180-
} else if ctx.get_settings().get_enable_auto_vacuum()? {
181+
} else if Self::is_auto_vacuum_enabled(ctx, table)? {
181182
Some(PurgeMode::PurgeAccordingToRetention)
182183
} else {
183184
None
184185
};
185186
Ok(mode)
186187
}
187188

189+
fn is_auto_vacuum_enabled(ctx: &dyn TableContext, table: &FuseTable) -> Result<bool> {
190+
// Priority for auto vacuum:
191+
// - If table-level option `FUSE_OPT_KEY_ENABLE_AUTO_VACUUM` is set, it takes precedence
192+
// - If table-level option is not set, fall back to the setting
193+
match table
194+
.table_info
195+
.options()
196+
.get(FUSE_OPT_KEY_ENABLE_AUTO_VACUUM)
197+
{
198+
Some(v) => {
199+
let enabled = v.parse::<u32>()? != 0;
200+
Ok(enabled)
201+
}
202+
None => ctx.get_settings().get_enable_auto_vacuum(),
203+
}
204+
}
205+
188206
fn is_error_recoverable(&self, e: &ErrorCode) -> bool {
189207
let code = e.code();
190208
// When prev_snapshot_id is some, means it is an alter table column modification or truncate.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
## Copyright 2023 Databend Cloud
2+
##
3+
## Licensed under the Elastic License, Version 2.0 (the "License");
4+
## you may not use this file except in compliance with the License.
5+
## You may obtain a copy of the License at
6+
##
7+
## https://www.elastic.co/licensing/elastic-license
8+
##
9+
## Unless required by applicable law or agreed to in writing, software
10+
## distributed under the License is distributed on an "AS IS" BASIS,
11+
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
## See the License for the specific language governing permissions and
13+
## limitations under the License.
14+
15+
statement ok
16+
create or replace database auto_vacuum_tbl_opt;
17+
18+
statement ok
19+
use auto_vacuum_tbl_opt;
20+
21+
statement ok
22+
create or replace table t (c int) 'fs:///tmp/tbl_auto_vacuum/' data_retention_num_snapshots_to_keep = 1;
23+
24+
statement ok
25+
create or replace stage stage_av url = 'fs:///tmp/tbl_auto_vacuum/';
26+
27+
# CASE1: By default, no auto vacuum should be triggered
28+
statement ok
29+
insert into t values(1);
30+
31+
statement ok
32+
insert into t values(2);
33+
34+
onlyif mysql
35+
query I
36+
select count() from list_stage(location=> '@stage_av') where name like '%_ss%';
37+
----
38+
2
39+
40+
# CASE 2: Table level option should have higher priority than settings
41+
42+
# CASE 2.1: Enable auto_vacuum at table level
43+
# make sure auto_vacuum is disabled at session setting level
44+
statement ok
45+
set enable_auto_vacuum = 0;
46+
47+
statement ok
48+
alter table t set options(enable_auto_vacuum = 1);
49+
50+
statement ok
51+
insert into t values(3);
52+
53+
# Auto vacuum should be triggered
54+
onlyif mysql
55+
query I
56+
select count() from list_stage(location=> '@stage_av') where name like '%_ss%';
57+
----
58+
1
59+
60+
# CASE 2.1: Disable auto_vacuum at table level
61+
statement ok
62+
set enable_auto_vacuum = 1;
63+
64+
statement ok
65+
alter table t set options(enable_auto_vacuum = 0);
66+
67+
statement ok
68+
insert into t values(3);
69+
70+
# Auto vacuum should NOT be triggered
71+
onlyif mysql
72+
query I
73+
select count() from list_stage(location=> '@stage_av') where name like '%_ss%';
74+
----
75+
2
76+
77+
statement ok
78+
remove @stage_av;
79+
80+
statement ok
81+
drop stage stage_av;
82+

0 commit comments

Comments
 (0)