diff --git a/src/query/service/src/interpreters/interpreter_table_drop.rs b/src/query/service/src/interpreters/interpreter_table_drop.rs index c08dc4bbdd405..231f2d1555adc 100644 --- a/src/query/service/src/interpreters/interpreter_table_drop.rs +++ b/src/query/service/src/interpreters/interpreter_table_drop.rs @@ -132,14 +132,14 @@ impl Interpreter for DropTableInterpreter { // thus if we do not refresh the table instance, `truncate` will fail let latest = tbl.as_ref().refresh(self.ctx.as_ref()).await?; let maybe_fuse_table = FuseTable::try_from_table(latest.as_ref()); - // if target table if of type FuseTable, purge its historical data + // if target table is of type FuseTable, purge its historical data // otherwise, plain truncate if let Ok(fuse_table) = maybe_fuse_table { fuse_table .do_truncate( self.ctx.clone(), &mut build_res.main_pipeline, - TruncateMode::Purge, + TruncateMode::DropAllPurge, ) .await? } else { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 265ebf4d4171f..0536b3591536a 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -159,6 +159,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=data_retention_time_in_days_max)), }), + ("transient_data_retention_time_in_minutes", DefaultSettingValue { + value: UserSettingValue::UInt64(60), + desc: "Sets the transient data retention time in minutes.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..= 24 * 60)), + }), ("max_storage_io_requests", DefaultSettingValue { value: UserSettingValue::UInt64(default_max_storage_io_requests), desc: "Sets the maximum number of concurrent I/O requests.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 16c957004fc48..f27e772063a0e 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -183,6 +183,10 @@ impl Settings { self.try_get_u64("data_retention_time_in_days") } + pub fn get_transient_data_retention_time_in_minutes(&self) -> Result { + self.try_get_u64("transient_data_retention_time_in_minutes") + } + pub fn get_max_storage_io_requests(&self) -> Result { self.try_get_u64("max_storage_io_requests") } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 29e8b910ec37e..813b5ce8698ff 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -718,7 +718,11 @@ impl Table for FuseTable { keep_last_snapshot: bool, dry_run: bool, ) -> Result>> { - match self.navigate_for_purge(&ctx, instant).await { + let by_pass_retention_check_for_nav_by_time_point = false; + match self + .navigate_for_purge(&ctx, instant, by_pass_retention_check_for_nav_by_time_point) + .await + { Ok((table, files)) => { table .do_purge(&ctx, files, num_snapshot_limit, keep_last_snapshot, dry_run) diff --git a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs index 440f2cac3b15f..514ecceded635 100644 --- a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs @@ -23,7 +23,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::operations::common::ConflictResolveContext; #[async_trait::async_trait] -pub trait SnapshotGenerator { +pub trait SnapshotGenerator: Sync { /// Convert to `Any`, to enable dynamic casting. fn as_any(&self) -> &dyn Any; diff --git a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs index 0f5021db14a59..ebfa872b58592 100644 --- a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs @@ -30,7 +30,7 @@ pub enum TruncateMode { // Delete the data, used for delete operation. Delete, // Truncate and purge the historical data. - Purge, + DropAllPurge, } #[derive(Clone)] diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 6061b44dfeb3b..376bca5f62401 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -19,7 +19,10 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; +use chrono::DateTime; +use chrono::Utc; use databend_common_catalog::lock::Lock; +use databend_common_catalog::table::NavigationPoint; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_catalog::table_context::TableContext; @@ -120,7 +123,7 @@ where F: SnapshotGenerator + Send + 'static prev_snapshot_id: Option, deduplicated_label: Option, ) -> Result { - let purge = Self::do_purge(table, &snapshot_gen); + let purge = Self::need_purge(table, &snapshot_gen); Ok(ProcessorPtr::create(Box::new(CommitSink { state: State::None, ctx, @@ -188,7 +191,7 @@ where F: SnapshotGenerator + Send + 'static Ok(Event::Async) } - fn do_purge(table: &FuseTable, snapshot_gen: &F) -> bool { + fn need_purge(table: &FuseTable, snapshot_gen: &F) -> bool { if table.transient() { return true; } @@ -196,10 +199,63 @@ where F: SnapshotGenerator + Send + 'static snapshot_gen .as_any() .downcast_ref::() - .is_some_and(|gen| matches!(gen.mode(), TruncateMode::Purge)) + .is_some_and(|gen| matches!(gen.mode(), TruncateMode::DropAllPurge)) } - fn do_truncate(&self) -> bool { + async fn purge_table(&self, tbl: &FuseTable) -> Result<()> { + let snapshot_files = tbl.list_snapshot_files().await?; + let keep_last_snapshot = true; + let dry_run = false; + tbl.do_purge(&self.ctx, snapshot_files, None, keep_last_snapshot, dry_run) + .await?; + Ok(()) + } + + async fn purge_transient_table( + &self, + tbl: &FuseTable, + snapshot_timestamp: DateTime, + ) -> Result<()> { + let transient_data_retention_minutes = self + .ctx + .get_settings() + .get_transient_data_retention_time_in_minutes()?; + + if transient_data_retention_minutes == 0 { + // if transient_data_retention_time_in_minutes is set to 0, + // fallback to normal purge (which is slightly faster) + return self.purge_table(tbl).await; + } + + let instant = + snapshot_timestamp - chrono::Duration::minutes(transient_data_retention_minutes as i64); + + let by_pass_retention_period_checking = true; + let keep_last_snapshot = true; + let dry_run = false; + + let (table, candidate_snapshots) = tbl + .navigate_for_purge( + &self.ctx, + Some(NavigationPoint::TimePoint(instant)), + by_pass_retention_period_checking, + ) + .await?; + + table + .do_purge( + &self.ctx, + candidate_snapshots, + None, + keep_last_snapshot, + dry_run, + ) + .await?; + + Ok(()) + } + + fn need_truncate(&self) -> bool { self.snapshot_gen .as_any() .downcast_ref::() @@ -351,6 +407,7 @@ where F: SnapshotGenerator + Send + 'static self.dal.write(&location, data).await?; + let snapshot_timestamp = snapshot.timestamp.unwrap(); let catalog = self.ctx.get_catalog(table_info.catalog()).await?; match FuseTable::update_table_meta( catalog.clone(), @@ -366,7 +423,7 @@ where F: SnapshotGenerator + Send + 'static .await { Ok(_) => { - if self.do_truncate() { + if self.need_truncate() { catalog .truncate_table(&table_info, TruncateTableReq { table_id: table_info.ident.table_id, @@ -380,32 +437,34 @@ where F: SnapshotGenerator + Send + 'static let latest = self.table.refresh(self.ctx.as_ref()).await?; let tbl = FuseTable::try_from_table(latest.as_ref())?; - warn!( - "table detected, purging historical data. ({})", - tbl.table_info.ident + info!( + "purging historical data. (name{}, id {})", + tbl.table_info.name, tbl.table_info.ident ); - let keep_last_snapshot = true; - let snapshot_files = tbl.list_snapshot_files().await?; - if let Err(e) = tbl - .do_purge( - &self.ctx, - snapshot_files, - None, - keep_last_snapshot, - false, - ) - .await - { - // Errors of GC, if any, are ignored, since GC task can be picked up - warn!( - "GC of table not success (this is not a permanent error). the error : {}", - e - ); + // purge table, swallow errors + let table_is_transient = tbl.transient(); + let res = if table_is_transient { + self.purge_transient_table(tbl, snapshot_timestamp).await } else { - info!("GC of table done"); + self.purge_table(tbl).await + }; + + match res { + Err(e) => warn!( + "purge table (name: {}, id: {}, transient: {}) failed (non-permanent error). the error : {}", + tbl.table_info.name, + tbl.table_info.ident, + table_is_transient, + e + ), + Ok(()) => info!( + "purge table done. (name: {}, id: {}, transient: {})", + tbl.table_info.name, tbl.table_info.ident, table_is_transient, + ), } } + metrics_inc_commit_mutation_success(); { let elapsed_time = self.start_time.elapsed().as_millis(); diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index 775245efeb792..a8bc49fbc6f5c 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -199,9 +199,8 @@ impl FuseTable { &self, ctx: &Arc, instant: Option, + by_pass_retention_check_for_nav_by_time_point: bool, ) -> Result<(Arc, Vec)> { - let retention = - Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64); let root_snapshot = if let Some(snapshot) = self.read_table_snapshot().await? { snapshot } else { @@ -211,22 +210,32 @@ impl FuseTable { }; assert!(root_snapshot.timestamp.is_some()); - let mut time_point = root_snapshot.timestamp.unwrap() - retention; + let retention = + Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64); + let min_time_point = root_snapshot.timestamp.unwrap() - retention; let (location, files) = match instant { Some(NavigationPoint::TimePoint(point)) => { - time_point = std::cmp::min(point, time_point); - self.list_by_time_point(time_point).await + let nav_time_point = if by_pass_retention_check_for_nav_by_time_point { + point + } else { + std::cmp::max(point, min_time_point) + }; + self.list_by_time_point(nav_time_point).await } Some(NavigationPoint::SnapshotID(snapshot_id)) => { - self.list_by_snapshot_id(snapshot_id.as_str(), time_point) + self.list_by_snapshot_id(snapshot_id.as_str(), min_time_point) .await } - Some(NavigationPoint::StreamInfo(info)) => self.list_by_stream(info, time_point).await, - None => self.list_by_time_point(time_point).await, + Some(NavigationPoint::StreamInfo(info)) => { + self.list_by_stream(info, min_time_point).await + } + None => self.list_by_time_point(min_time_point).await, }?; - let table = self.navigate_to_time_point(location, time_point).await?; + let table = self + .navigate_to_time_point(location, min_time_point) + .await?; Ok((table, files)) } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0017_transient_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0017_transient_table.test index 50a3db5b52a05..902a950676686 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0017_transient_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0017_transient_table.test @@ -10,6 +10,9 @@ USE db1 statement ok CREATE TRANSIENT TABLE IF NOT EXISTS t09_0016(a int) +statement ok +set transient_data_retention_time_in_minutes = 0; + statement ok INSERT INTO t09_0016 VALUES(1) @@ -27,7 +30,7 @@ select * from t09_0016 order by a 3 query B -select count(*)=1 from fuse_snapshot('db1', 't09_0016') +select count(*) from fuse_snapshot('db1', 't09_0016') ---- 1