Skip to content

Commit 0368216

Browse files
authored
feat: new table option data_retention_period_in_hours (#16266)
* feat: new table option `data_retention_period_in_hours` * refactor: rename things * add logic test * adjust logic test
1 parent bb36737 commit 0368216

File tree

12 files changed

+97
-11
lines changed

12 files changed

+97
-11
lines changed

src/common/io/src/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,6 @@ pub const DEFAULT_BLOCK_INDEX_BUFFER_SIZE: usize = 300 * 1024;
3232
pub const DEFAULT_BLOCK_MAX_ROWS: usize = 1000 * 1000;
3333
// The min number of a block by default.
3434
pub const DEFAULT_BLOCK_MIN_ROWS: usize = 800 * 1000;
35+
36+
// The min values of table option data_retention_period_in_hours
37+
pub const DEFAULT_MIN_TABLE_LEVEL_DATA_RETENTION_PERIOD_IN_HOURS: u64 = 1;

src/query/ee/src/storages/fuse/operations/vacuum_table.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::Arc;
1717
use std::time::Instant;
1818

1919
use chrono::DateTime;
20-
use chrono::Duration;
2120
use chrono::Utc;
2221
use databend_common_catalog::table::NavigationPoint;
2322
use databend_common_catalog::table::Table;
@@ -355,7 +354,7 @@ pub async fn do_vacuum(
355354
.await?;
356355
let status = format!("do_vacuum: purged table, cost:{:?}", start.elapsed());
357356
ctx.set_status_info(&status);
358-
let retention = Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64);
357+
let retention = fuse_table.get_data_retention_period(ctx.as_ref())?;
359358
// use min(now - get_retention_period(), retention_time) as gc orphan files retention time
360359
// to protect files that generated by txn which has not been committed being gc.
361360
let retention_time = std::cmp::min(chrono::Utc::now() - retention, retention_time);

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::HashSet;
1717
use std::sync::Arc;
1818
use std::sync::LazyLock;
1919

20+
use chrono::Duration;
2021
use chrono::Utc;
2122
use databend_common_base::runtime::GlobalIORuntime;
2223
use databend_common_config::GlobalConfig;
@@ -26,6 +27,7 @@ use databend_common_expression::is_internal_column;
2627
use databend_common_expression::TableSchemaRef;
2728
use databend_common_expression::TableSchemaRefExt;
2829
use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS;
30+
use databend_common_io::constants::DEFAULT_MIN_TABLE_LEVEL_DATA_RETENTION_PERIOD_IN_HOURS;
2931
use databend_common_license::license::Feature;
3032
use databend_common_license::license::Feature::ComputedColumn;
3133
use databend_common_license::license::Feature::InvertedIndex;
@@ -42,12 +44,14 @@ use databend_common_meta_app::schema::TableNameIdent;
4244
use databend_common_meta_app::schema::TableStatistics;
4345
use databend_common_meta_types::MatchSeq;
4446
use databend_common_pipeline_core::ExecutionInfo;
47+
use databend_common_settings::Settings;
4548
use databend_common_sql::field_default_value;
4649
use databend_common_sql::plans::CreateTablePlan;
4750
use databend_common_sql::BloomIndexColumns;
4851
use databend_common_storages_fuse::io::MetaReaders;
4952
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
5053
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
54+
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
5155
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
5256
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
5357
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_PAGE;
@@ -448,6 +452,8 @@ impl CreateTableInterpreter {
448452
is_valid_change_tracking(&table_meta.options)?;
449453
// check random seed
450454
is_valid_random_seed(&table_meta.options)?;
455+
// check table level data_retention_period_in_hours
456+
is_valid_data_retention_period(&table_meta.options)?;
451457

452458
for table_option in table_meta.options.iter() {
453459
let key = table_option.0.to_lowercase();
@@ -498,6 +504,7 @@ pub static CREATE_TABLE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new
498504
r.insert(FUSE_OPT_KEY_ROW_PER_BLOCK);
499505
r.insert(FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD);
500506
r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD);
507+
r.insert(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS);
501508

502509
r.insert(OPT_KEY_BLOOM_INDEX_COLUMNS);
503510
r.insert(OPT_KEY_TABLE_COMPRESSION);
@@ -561,6 +568,32 @@ pub fn is_valid_row_per_block(options: &BTreeMap<String, String>) -> Result<()>
561568
Ok(())
562569
}
563570

571+
pub fn is_valid_data_retention_period(options: &BTreeMap<String, String>) -> Result<()> {
572+
if let Some(value) = options.get(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS) {
573+
let new_duration_in_hours = value.parse::<u64>()?;
574+
575+
if new_duration_in_hours < DEFAULT_MIN_TABLE_LEVEL_DATA_RETENTION_PERIOD_IN_HOURS {
576+
return Err(ErrorCode::TableOptionInvalid(format!(
577+
"Invalid data_retention_period_in_hours {:?}, it should not be lesser than {:?}",
578+
new_duration_in_hours, DEFAULT_MIN_TABLE_LEVEL_DATA_RETENTION_PERIOD_IN_HOURS
579+
)));
580+
}
581+
582+
let default_max_period_in_days = Settings::get_max_data_retention_period_in_days();
583+
584+
let default_max_duration = Duration::days(default_max_period_in_days as i64);
585+
let new_duration = Duration::hours(new_duration_in_hours as i64);
586+
587+
if new_duration > default_max_duration {
588+
return Err(ErrorCode::TableOptionInvalid(format!(
589+
"Invalid data_retention_period_in_hours {:?}, it should not be larger than {:?}",
590+
new_duration, default_max_duration
591+
)));
592+
}
593+
}
594+
Ok(())
595+
}
596+
564597
pub fn is_valid_bloom_index_columns(
565598
options: &BTreeMap<String, String>,
566599
schema: TableSchemaRef,

src/query/service/src/interpreters/interpreter_table_set_options.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use log::error;
3232
use super::interpreter_table_create::is_valid_block_per_segment;
3333
use super::interpreter_table_create::is_valid_bloom_index_columns;
3434
use super::interpreter_table_create::is_valid_create_opt;
35+
use super::interpreter_table_create::is_valid_data_retention_period;
3536
use super::interpreter_table_create::is_valid_row_per_block;
3637
use crate::interpreters::Interpreter;
3738
use crate::pipelines::PipelineBuildResult;
@@ -66,6 +67,9 @@ impl Interpreter for SetOptionsInterpreter {
6667
is_valid_block_per_segment(&self.plan.set_options)?;
6768
// check row_per_block
6869
is_valid_row_per_block(&self.plan.set_options)?;
70+
// check row_per_block
71+
is_valid_data_retention_period(&self.plan.set_options)?;
72+
6973
// check storage_format
7074
let error_str = "invalid opt for fuse table in alter table statement";
7175
if self.plan.set_options.contains_key(OPT_KEY_STORAGE_FORMAT) {

src/query/service/src/interpreters/interpreter_table_vacuum.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use std::sync::Arc;
1616

17-
use chrono::Duration;
1817
use databend_common_catalog::table::TableExt;
1918
use databend_common_exception::Result;
2019
use databend_common_expression::types::StringType;
@@ -124,12 +123,12 @@ impl Interpreter for VacuumTableInterpreter {
124123
// check mutability
125124
table.check_mutable()?;
126125

127-
let duration = Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64);
126+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
127+
let duration = fuse_table.get_data_retention_period(ctx.as_ref())?;
128128

129129
let retention_time = chrono::Utc::now() - duration;
130130
let ctx = self.ctx.clone();
131131

132-
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
133132
let handler = get_vacuum_handler();
134133
let purge_files_opt = handler
135134
.do_vacuum(

src/query/settings/src/settings_default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,7 @@ impl DefaultSettings {
847847
/// The maximum number of days that data can be retained.
848848
/// The max is read from the global config:data_retention_time_in_days_max
849849
/// If the global config is not set, the default value is 90 days.
850-
fn data_retention_time_in_days_max() -> u64 {
850+
pub(crate) fn data_retention_time_in_days_max() -> u64 {
851851
match GlobalConfig::try_get_instance() {
852852
None => 90,
853853
Some(conf) => conf.query.data_retention_time_in_days_max,

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,4 +681,8 @@ impl Settings {
681681
pub fn get_enable_last_snapshot_location_hint(&self) -> Result<bool> {
682682
Ok(self.try_get_u64("enable_last_snapshot_location_hint")? == 1)
683683
}
684+
685+
pub fn get_max_data_retention_period_in_days() -> u64 {
686+
DefaultSettings::data_retention_time_in_days_max()
687+
}
684688
}

src/query/storages/fuse/src/constants.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block";
1818
pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page";
1919
pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold";
2020

21+
pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours";
22+
2123
pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
2224
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";
2325
pub const FUSE_TBL_XOR_BLOOM_INDEX_PREFIX: &str = "_i_b_v2";

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::str;
2121
use std::str::FromStr;
2222
use std::sync::Arc;
2323

24+
use chrono::Duration;
25+
use chrono::TimeDelta;
2426
use databend_common_catalog::catalog::StorageDescription;
2527
use databend_common_catalog::plan::DataSourcePlan;
2628
use databend_common_catalog::plan::PartStatistics;
@@ -109,6 +111,7 @@ use crate::DEFAULT_ROW_PER_PAGE;
109111
use crate::DEFAULT_ROW_PER_PAGE_FOR_BLOCKING;
110112
use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
111113
use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
114+
use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
112115
use crate::FUSE_OPT_KEY_ROW_PER_BLOCK;
113116
use crate::FUSE_OPT_KEY_ROW_PER_PAGE;
114117
use crate::FUSE_TBL_LAST_SNAPSHOT_HINT;
@@ -438,7 +441,7 @@ impl FuseTable {
438441
})
439442
}
440443

441-
pub fn transient(&self) -> bool {
444+
pub fn is_transient(&self) -> bool {
442445
self.table_info.meta.options.contains_key("TRANSIENT")
443446
}
444447

@@ -476,6 +479,21 @@ impl FuseTable {
476479
.map(|v| v.data_type().clone())
477480
.collect()
478481
}
482+
483+
pub fn get_data_retention_period(&self, ctx: &dyn TableContext) -> Result<TimeDelta> {
484+
let retention_period = if let Some(v) = self
485+
.table_info
486+
.meta
487+
.options
488+
.get(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS)
489+
{
490+
let retention_period = v.parse::<u64>()?;
491+
Duration::hours(retention_period as i64)
492+
} else {
493+
Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64)
494+
};
495+
Ok(retention_period)
496+
}
479497
}
480498

481499
#[async_trait::async_trait]

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ where F: SnapshotGenerator + Send + 'static
177177
}
178178

179179
fn do_purge(table: &FuseTable, snapshot_gen: &F) -> bool {
180-
if table.transient() {
180+
if table.is_transient() {
181181
return true;
182182
}
183183

0 commit comments

Comments
 (0)