From 9238c0aa546db93028f647ad0a610b9ca1dbd9f8 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 26 Apr 2024 14:54:18 +0800 Subject: [PATCH 1/4] chore: tweak transient table data retention policy - introduce new setting `transient_data_retention_time_in_minutes` which set the retention period (in minutes) of transient table - during purging data of transient table use the value of setting `transient_data_retention_time_in_minutes` in table navigation --- src/query/settings/src/settings_default.rs | 6 ++ .../settings/src/settings_getter_setter.rs | 4 + src/query/storages/fuse/src/fuse_table.rs | 6 +- .../common/processors/sink_commit.rs | 80 ++++++++++++++----- .../storages/fuse/src/operations/navigate.rs | 11 ++- 5 files changed, 85 insertions(+), 22 deletions(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 265ebf4d4171f..054f2e0a3f8b5 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -159,6 +159,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=data_retention_time_in_days_max)), }), + ("transient_data_retention_time_in_minutes", DefaultSettingValue { + value: UserSettingValue::UInt64(60), + desc: "Sets the data retention time in minutes.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..= 24 * 60)), + }), ("max_storage_io_requests", DefaultSettingValue { value: UserSettingValue::UInt64(default_max_storage_io_requests), desc: "Sets the maximum number of concurrent I/O requests.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 16c957004fc48..f27e772063a0e 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -183,6 +183,10 @@ impl Settings { self.try_get_u64("data_retention_time_in_days") } + pub fn get_transient_data_retention_time_in_minutes(&self) -> Result { + self.try_get_u64("transient_data_retention_time_in_minutes") + } + pub fn get_max_storage_io_requests(&self) -> Result { self.try_get_u64("max_storage_io_requests") } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 29e8b910ec37e..8e6dd3ddee5eb 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -718,7 +718,11 @@ impl Table for FuseTable { keep_last_snapshot: bool, dry_run: bool, ) -> Result>> { - match self.navigate_for_purge(&ctx, instant).await { + let by_pass_retention_period_check = false; + match self + .navigate_for_purge(&ctx, instant, by_pass_retention_period_check) + .await + { Ok((table, files)) => { table .do_purge(&ctx, files, num_snapshot_limit, keep_last_snapshot, dry_run) diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 6061b44dfeb3b..429d60a0c8873 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -20,6 +20,7 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; use databend_common_catalog::lock::Lock; +use databend_common_catalog::table::NavigationPoint; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_catalog::table_context::TableContext; @@ -351,6 +352,7 @@ where F: SnapshotGenerator + Send + 'static self.dal.write(&location, data).await?; + let snapshot_timestamp = snapshot.timestamp.unwrap(); let catalog = self.ctx.get_catalog(table_info.catalog()).await?; match FuseTable::update_table_meta( catalog.clone(), @@ -380,31 +382,73 @@ where F: SnapshotGenerator + Send + 'static let latest = self.table.refresh(self.ctx.as_ref()).await?; let tbl = FuseTable::try_from_table(latest.as_ref())?; - warn!( - "table detected, purging historical data. ({})", - tbl.table_info.ident - ); + let transient_data_retention_minutes = self + .ctx + .get_settings() + .get_transient_data_retention_time_in_minutes()?; + + let instant = snapshot_timestamp + - chrono::Duration::minutes( + transient_data_retention_minutes as i64, + ); let keep_last_snapshot = true; - let snapshot_files = tbl.list_snapshot_files().await?; - if let Err(e) = tbl - .do_purge( + let dry_run = false; + let by_pass_retention_period_checking = true; + + info!( + "purging historical data. (name{}, id {})", + tbl.table_info.name, tbl.table_info.ident + ); + // pure historical data. + // errors ignored (since it can be picked up by subsequent purge actions) + match tbl + .navigate_for_purge( &self.ctx, - snapshot_files, - None, - keep_last_snapshot, - false, + Some(NavigationPoint::TimePoint(instant)), + by_pass_retention_period_checking, ) .await { - // Errors of GC, if any, are ignored, since GC task can be picked up - warn!( - "GC of table not success (this is not a permanent error). the error : {}", - e - ); - } else { - info!("GC of table done"); + Ok((table, files)) => { + if let Err(e) = table + .do_purge( + &self.ctx, + files, + None, + keep_last_snapshot, + dry_run, + ) + .await + { + // Errors of GC, if any, are ignored, since GC task can be picked up + warn!("table GC failed (none permanent error) : {}", e); + } else { + info!("table GC done"); + } + } + Err(e) => { + warn!( + "table GC failed (none permanent error) : navigation error {:?}", + e + ); + } } + + // let snapshot_files = tbl.list_snapshot_files().await?; + + // if let Err(e) = tbl + // .do_purge( + // &self.ctx, + // snapshot_files, + // None, + // keep_last_snapshot, + // dry_run, + // ) + // .await + //{ + //} else { + //} } metrics_inc_commit_mutation_success(); { diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index 775245efeb792..4afb0903de2ca 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -199,9 +199,8 @@ impl FuseTable { &self, ctx: &Arc, instant: Option, + by_pass_retention_period_checking: bool, ) -> Result<(Arc, Vec)> { - let retention = - Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64); let root_snapshot = if let Some(snapshot) = self.read_table_snapshot().await? { snapshot } else { @@ -211,11 +210,17 @@ impl FuseTable { }; assert!(root_snapshot.timestamp.is_some()); + let retention = + Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64); let mut time_point = root_snapshot.timestamp.unwrap() - retention; let (location, files) = match instant { Some(NavigationPoint::TimePoint(point)) => { - time_point = std::cmp::min(point, time_point); + time_point = if !by_pass_retention_period_checking { + std::cmp::min(point, time_point) + } else { + point + }; self.list_by_time_point(time_point).await } Some(NavigationPoint::SnapshotID(snapshot_id)) => { From f5412ba17a4fdfce03da5c43ef7dc89b92f86f16 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 26 Apr 2024 15:07:02 +0800 Subject: [PATCH 2/4] fix: description of setting `transient_data_retention_time_in_minutes` --- src/query/settings/src/settings_default.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 054f2e0a3f8b5..0536b3591536a 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -161,7 +161,7 @@ impl DefaultSettings { }), ("transient_data_retention_time_in_minutes", DefaultSettingValue { value: UserSettingValue::UInt64(60), - desc: "Sets the data retention time in minutes.", + desc: "Sets the transient data retention time in minutes.", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..= 24 * 60)), }), From 730a84c57233b58f5cf9148d446381ec2420378e Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 26 Apr 2024 15:20:38 +0800 Subject: [PATCH 3/4] chore: cleanup --- .../interpreters/interpreter_table_drop.rs | 4 +- src/query/storages/fuse/src/fuse_table.rs | 4 +- .../common/generators/snapshot_generator.rs | 2 +- .../common/generators/truncate_generator.rs | 2 +- .../common/processors/sink_commit.rs | 136 +++++++++--------- .../storages/fuse/src/operations/navigate.rs | 24 ++-- 6 files changed, 89 insertions(+), 83 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_table_drop.rs b/src/query/service/src/interpreters/interpreter_table_drop.rs index c08dc4bbdd405..231f2d1555adc 100644 --- a/src/query/service/src/interpreters/interpreter_table_drop.rs +++ b/src/query/service/src/interpreters/interpreter_table_drop.rs @@ -132,14 +132,14 @@ impl Interpreter for DropTableInterpreter { // thus if we do not refresh the table instance, `truncate` will fail let latest = tbl.as_ref().refresh(self.ctx.as_ref()).await?; let maybe_fuse_table = FuseTable::try_from_table(latest.as_ref()); - // if target table if of type FuseTable, purge its historical data + // if target table is of type FuseTable, purge its historical data // otherwise, plain truncate if let Ok(fuse_table) = maybe_fuse_table { fuse_table .do_truncate( self.ctx.clone(), &mut build_res.main_pipeline, - TruncateMode::Purge, + TruncateMode::DropAllPurge, ) .await? } else { diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 8e6dd3ddee5eb..813b5ce8698ff 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -718,9 +718,9 @@ impl Table for FuseTable { keep_last_snapshot: bool, dry_run: bool, ) -> Result>> { - let by_pass_retention_period_check = false; + let by_pass_retention_check_for_nav_by_time_point = false; match self - .navigate_for_purge(&ctx, instant, by_pass_retention_period_check) + .navigate_for_purge(&ctx, instant, by_pass_retention_check_for_nav_by_time_point) .await { Ok((table, files)) => { diff --git a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs index 440f2cac3b15f..514ecceded635 100644 --- a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs @@ -23,7 +23,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::operations::common::ConflictResolveContext; #[async_trait::async_trait] -pub trait SnapshotGenerator { +pub trait SnapshotGenerator: Sync { /// Convert to `Any`, to enable dynamic casting. fn as_any(&self) -> &dyn Any; diff --git a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs index 0f5021db14a59..ebfa872b58592 100644 --- a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs @@ -30,7 +30,7 @@ pub enum TruncateMode { // Delete the data, used for delete operation. Delete, // Truncate and purge the historical data. - Purge, + DropAllPurge, } #[derive(Clone)] diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 429d60a0c8873..b2138b2ebb121 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -19,6 +19,8 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; +use chrono::DateTime; +use chrono::Utc; use databend_common_catalog::lock::Lock; use databend_common_catalog::table::NavigationPoint; use databend_common_catalog::table::Table; @@ -121,7 +123,7 @@ where F: SnapshotGenerator + Send + 'static prev_snapshot_id: Option, deduplicated_label: Option, ) -> Result { - let purge = Self::do_purge(table, &snapshot_gen); + let purge = Self::need_purge(table, &snapshot_gen); Ok(ProcessorPtr::create(Box::new(CommitSink { state: State::None, ctx, @@ -189,7 +191,7 @@ where F: SnapshotGenerator + Send + 'static Ok(Event::Async) } - fn do_purge(table: &FuseTable, snapshot_gen: &F) -> bool { + fn need_purge(table: &FuseTable, snapshot_gen: &F) -> bool { if table.transient() { return true; } @@ -197,10 +199,54 @@ where F: SnapshotGenerator + Send + 'static snapshot_gen .as_any() .downcast_ref::() - .is_some_and(|gen| matches!(gen.mode(), TruncateMode::Purge)) + .is_some_and(|gen| matches!(gen.mode(), TruncateMode::DropAllPurge)) } - fn do_truncate(&self) -> bool { + async fn purge_table(&self, tbl: &FuseTable) -> Result<()> { + let snapshot_files = tbl.list_snapshot_files().await?; + let keep_last_snapshot = true; + let dry_run = false; + tbl.do_purge(&self.ctx, snapshot_files, None, keep_last_snapshot, dry_run) + .await?; + Ok(()) + } + + async fn purge_transient_table( + &self, + tbl: &FuseTable, + snapshot_timestamp: DateTime, + ) -> Result<()> { + let transient_data_retention_minutes = self + .ctx + .get_settings() + .get_transient_data_retention_time_in_minutes()?; + + let instant = + snapshot_timestamp - chrono::Duration::minutes(transient_data_retention_minutes as i64); + + let by_pass_retention_period_checking = true; + let keep_last_snapshot = true; + let dry_run = false; + let (table, candidate_snapshots) = tbl + .navigate_for_purge( + &self.ctx, + Some(NavigationPoint::TimePoint(instant)), + by_pass_retention_period_checking, + ) + .await?; + table + .do_purge( + &self.ctx, + candidate_snapshots, + None, + keep_last_snapshot, + dry_run, + ) + .await?; + Ok(()) + } + + fn need_truncate(&self) -> bool { self.snapshot_gen .as_any() .downcast_ref::() @@ -368,7 +414,7 @@ where F: SnapshotGenerator + Send + 'static .await { Ok(_) => { - if self.do_truncate() { + if self.need_truncate() { catalog .truncate_table(&table_info, TruncateTableReq { table_id: table_info.ident.table_id, @@ -382,74 +428,30 @@ where F: SnapshotGenerator + Send + 'static let latest = self.table.refresh(self.ctx.as_ref()).await?; let tbl = FuseTable::try_from_table(latest.as_ref())?; - let transient_data_retention_minutes = self - .ctx - .get_settings() - .get_transient_data_retention_time_in_minutes()?; - - let instant = snapshot_timestamp - - chrono::Duration::minutes( - transient_data_retention_minutes as i64, - ); - - let keep_last_snapshot = true; - let dry_run = false; - let by_pass_retention_period_checking = true; - info!( "purging historical data. (name{}, id {})", tbl.table_info.name, tbl.table_info.ident ); - // pure historical data. - // errors ignored (since it can be picked up by subsequent purge actions) - match tbl - .navigate_for_purge( - &self.ctx, - Some(NavigationPoint::TimePoint(instant)), - by_pass_retention_period_checking, - ) - .await - { - Ok((table, files)) => { - if let Err(e) = table - .do_purge( - &self.ctx, - files, - None, - keep_last_snapshot, - dry_run, - ) - .await - { - // Errors of GC, if any, are ignored, since GC task can be picked up - warn!("table GC failed (none permanent error) : {}", e); - } else { - info!("table GC done"); - } - } - Err(e) => { - warn!( - "table GC failed (none permanent error) : navigation error {:?}", - e - ); - } - } - // let snapshot_files = tbl.list_snapshot_files().await?; - - // if let Err(e) = tbl - // .do_purge( - // &self.ctx, - // snapshot_files, - // None, - // keep_last_snapshot, - // dry_run, - // ) - // .await - //{ - //} else { - //} + // purge table, swallow errors + let res = if tbl.transient() { + self.purge_transient_table(tbl, snapshot_timestamp).await + } else { + self.purge_table(tbl).await + }; + + match res { + Err(e) => warn!( + "purge table (name{}, id {}) failed (non-permanent error). the error : {}", + tbl.table_info.name, tbl.table_info.ident, e + ), + Ok(()) => info!( + "purge table done. (name{}, id {})", + tbl.table_info.name, tbl.table_info.ident + ), + } } + metrics_inc_commit_mutation_success(); { let elapsed_time = self.start_time.elapsed().as_millis(); diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index 4afb0903de2ca..a8bc49fbc6f5c 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -199,7 +199,7 @@ impl FuseTable { &self, ctx: &Arc, instant: Option, - by_pass_retention_period_checking: bool, + by_pass_retention_check_for_nav_by_time_point: bool, ) -> Result<(Arc, Vec)> { let root_snapshot = if let Some(snapshot) = self.read_table_snapshot().await? { snapshot @@ -212,26 +212,30 @@ impl FuseTable { assert!(root_snapshot.timestamp.is_some()); let retention = Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64); - let mut time_point = root_snapshot.timestamp.unwrap() - retention; + let min_time_point = root_snapshot.timestamp.unwrap() - retention; let (location, files) = match instant { Some(NavigationPoint::TimePoint(point)) => { - time_point = if !by_pass_retention_period_checking { - std::cmp::min(point, time_point) - } else { + let nav_time_point = if by_pass_retention_check_for_nav_by_time_point { point + } else { + std::cmp::max(point, min_time_point) }; - self.list_by_time_point(time_point).await + self.list_by_time_point(nav_time_point).await } Some(NavigationPoint::SnapshotID(snapshot_id)) => { - self.list_by_snapshot_id(snapshot_id.as_str(), time_point) + self.list_by_snapshot_id(snapshot_id.as_str(), min_time_point) .await } - Some(NavigationPoint::StreamInfo(info)) => self.list_by_stream(info, time_point).await, - None => self.list_by_time_point(time_point).await, + Some(NavigationPoint::StreamInfo(info)) => { + self.list_by_stream(info, min_time_point).await + } + None => self.list_by_time_point(min_time_point).await, }?; - let table = self.navigate_to_time_point(location, time_point).await?; + let table = self + .navigate_to_time_point(location, min_time_point) + .await?; Ok((table, files)) } From feb2b345bc76a5ba8aa3d9ed2315b8598ea348b1 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 26 Apr 2024 22:41:15 +0800 Subject: [PATCH 4/4] tweak logic test --- .../common/processors/sink_commit.rs | 23 +++++++++++++++---- .../09_0017_transient_table.test | 5 +++- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index b2138b2ebb121..376bca5f62401 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -221,12 +221,19 @@ where F: SnapshotGenerator + Send + 'static .get_settings() .get_transient_data_retention_time_in_minutes()?; + if transient_data_retention_minutes == 0 { + // if transient_data_retention_time_in_minutes is set to 0, + // fallback to normal purge (which is slightly faster) + return self.purge_table(tbl).await; + } + let instant = snapshot_timestamp - chrono::Duration::minutes(transient_data_retention_minutes as i64); let by_pass_retention_period_checking = true; let keep_last_snapshot = true; let dry_run = false; + let (table, candidate_snapshots) = tbl .navigate_for_purge( &self.ctx, @@ -234,6 +241,7 @@ where F: SnapshotGenerator + Send + 'static by_pass_retention_period_checking, ) .await?; + table .do_purge( &self.ctx, @@ -243,6 +251,7 @@ where F: SnapshotGenerator + Send + 'static dry_run, ) .await?; + Ok(()) } @@ -434,7 +443,8 @@ where F: SnapshotGenerator + Send + 'static ); // purge table, swallow errors - let res = if tbl.transient() { + let table_is_transient = tbl.transient(); + let res = if table_is_transient { self.purge_transient_table(tbl, snapshot_timestamp).await } else { self.purge_table(tbl).await @@ -442,12 +452,15 @@ where F: SnapshotGenerator + Send + 'static match res { Err(e) => warn!( - "purge table (name{}, id {}) failed (non-permanent error). the error : {}", - tbl.table_info.name, tbl.table_info.ident, e + "purge table (name: {}, id: {}, transient: {}) failed (non-permanent error). the error : {}", + tbl.table_info.name, + tbl.table_info.ident, + table_is_transient, + e ), Ok(()) => info!( - "purge table done. (name{}, id {})", - tbl.table_info.name, tbl.table_info.ident + "purge table done. (name: {}, id: {}, transient: {})", + tbl.table_info.name, tbl.table_info.ident, table_is_transient, ), } } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0017_transient_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0017_transient_table.test index 50a3db5b52a05..902a950676686 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0017_transient_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0017_transient_table.test @@ -10,6 +10,9 @@ USE db1 statement ok CREATE TRANSIENT TABLE IF NOT EXISTS t09_0016(a int) +statement ok +set transient_data_retention_time_in_minutes = 0; + statement ok INSERT INTO t09_0016 VALUES(1) @@ -27,7 +30,7 @@ select * from t09_0016 order by a 3 query B -select count(*)=1 from fuse_snapshot('db1', 't09_0016') +select count(*) from fuse_snapshot('db1', 't09_0016') ---- 1