Skip to content

Commit 9238c0a

Browse files
committed
chore: tweak transient table data retention policy
- introduce new setting `transient_data_retention_time_in_minutes` which set the retention period (in minutes) of transient table - during purging data of transient table use the value of setting `transient_data_retention_time_in_minutes` in table navigation
1 parent 8e67459 commit 9238c0a

File tree

5 files changed

+85
-22
lines changed

5 files changed

+85
-22
lines changed

src/query/settings/src/settings_default.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ impl DefaultSettings {
159159
mode: SettingMode::Both,
160160
range: Some(SettingRange::Numeric(0..=data_retention_time_in_days_max)),
161161
}),
162+
("transient_data_retention_time_in_minutes", DefaultSettingValue {
163+
value: UserSettingValue::UInt64(60),
164+
desc: "Sets the data retention time in minutes.",
165+
mode: SettingMode::Both,
166+
range: Some(SettingRange::Numeric(0..= 24 * 60)),
167+
}),
162168
("max_storage_io_requests", DefaultSettingValue {
163169
value: UserSettingValue::UInt64(default_max_storage_io_requests),
164170
desc: "Sets the maximum number of concurrent I/O requests.",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ impl Settings {
183183
self.try_get_u64("data_retention_time_in_days")
184184
}
185185

186+
pub fn get_transient_data_retention_time_in_minutes(&self) -> Result<u64> {
187+
self.try_get_u64("transient_data_retention_time_in_minutes")
188+
}
189+
186190
pub fn get_max_storage_io_requests(&self) -> Result<u64> {
187191
self.try_get_u64("max_storage_io_requests")
188192
}

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,11 @@ impl Table for FuseTable {
718718
keep_last_snapshot: bool,
719719
dry_run: bool,
720720
) -> Result<Option<Vec<String>>> {
721-
match self.navigate_for_purge(&ctx, instant).await {
721+
let by_pass_retention_period_check = false;
722+
match self
723+
.navigate_for_purge(&ctx, instant, by_pass_retention_period_check)
724+
.await
725+
{
722726
Ok((table, files)) => {
723727
table
724728
.do_purge(&ctx, files, num_snapshot_limit, keep_last_snapshot, dry_run)

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::time::Instant;
2020
use backoff::backoff::Backoff;
2121
use backoff::ExponentialBackoff;
2222
use databend_common_catalog::lock::Lock;
23+
use databend_common_catalog::table::NavigationPoint;
2324
use databend_common_catalog::table::Table;
2425
use databend_common_catalog::table::TableExt;
2526
use databend_common_catalog::table_context::TableContext;
@@ -351,6 +352,7 @@ where F: SnapshotGenerator + Send + 'static
351352

352353
self.dal.write(&location, data).await?;
353354

355+
let snapshot_timestamp = snapshot.timestamp.unwrap();
354356
let catalog = self.ctx.get_catalog(table_info.catalog()).await?;
355357
match FuseTable::update_table_meta(
356358
catalog.clone(),
@@ -380,31 +382,73 @@ where F: SnapshotGenerator + Send + 'static
380382
let latest = self.table.refresh(self.ctx.as_ref()).await?;
381383
let tbl = FuseTable::try_from_table(latest.as_ref())?;
382384

383-
warn!(
384-
"table detected, purging historical data. ({})",
385-
tbl.table_info.ident
386-
);
385+
let transient_data_retention_minutes = self
386+
.ctx
387+
.get_settings()
388+
.get_transient_data_retention_time_in_minutes()?;
389+
390+
let instant = snapshot_timestamp
391+
- chrono::Duration::minutes(
392+
transient_data_retention_minutes as i64,
393+
);
387394

388395
let keep_last_snapshot = true;
389-
let snapshot_files = tbl.list_snapshot_files().await?;
390-
if let Err(e) = tbl
391-
.do_purge(
396+
let dry_run = false;
397+
let by_pass_retention_period_checking = true;
398+
399+
info!(
400+
"purging historical data. (name{}, id {})",
401+
tbl.table_info.name, tbl.table_info.ident
402+
);
403+
// pure historical data.
404+
// errors ignored (since it can be picked up by subsequent purge actions)
405+
match tbl
406+
.navigate_for_purge(
392407
&self.ctx,
393-
snapshot_files,
394-
None,
395-
keep_last_snapshot,
396-
false,
408+
Some(NavigationPoint::TimePoint(instant)),
409+
by_pass_retention_period_checking,
397410
)
398411
.await
399412
{
400-
// Errors of GC, if any, are ignored, since GC task can be picked up
401-
warn!(
402-
"GC of table not success (this is not a permanent error). the error : {}",
403-
e
404-
);
405-
} else {
406-
info!("GC of table done");
413+
Ok((table, files)) => {
414+
if let Err(e) = table
415+
.do_purge(
416+
&self.ctx,
417+
files,
418+
None,
419+
keep_last_snapshot,
420+
dry_run,
421+
)
422+
.await
423+
{
424+
// Errors of GC, if any, are ignored, since GC task can be picked up
425+
warn!("table GC failed (none permanent error) : {}", e);
426+
} else {
427+
info!("table GC done");
428+
}
429+
}
430+
Err(e) => {
431+
warn!(
432+
"table GC failed (none permanent error) : navigation error {:?}",
433+
e
434+
);
435+
}
407436
}
437+
438+
// let snapshot_files = tbl.list_snapshot_files().await?;
439+
440+
// if let Err(e) = tbl
441+
// .do_purge(
442+
// &self.ctx,
443+
// snapshot_files,
444+
// None,
445+
// keep_last_snapshot,
446+
// dry_run,
447+
// )
448+
// .await
449+
//{
450+
//} else {
451+
//}
408452
}
409453
metrics_inc_commit_mutation_success();
410454
{

src/query/storages/fuse/src/operations/navigate.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,8 @@ impl FuseTable {
199199
&self,
200200
ctx: &Arc<dyn TableContext>,
201201
instant: Option<NavigationPoint>,
202+
by_pass_retention_period_checking: bool,
202203
) -> Result<(Arc<FuseTable>, Vec<String>)> {
203-
let retention =
204-
Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64);
205204
let root_snapshot = if let Some(snapshot) = self.read_table_snapshot().await? {
206205
snapshot
207206
} else {
@@ -211,11 +210,17 @@ impl FuseTable {
211210
};
212211

213212
assert!(root_snapshot.timestamp.is_some());
213+
let retention =
214+
Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64);
214215
let mut time_point = root_snapshot.timestamp.unwrap() - retention;
215216

216217
let (location, files) = match instant {
217218
Some(NavigationPoint::TimePoint(point)) => {
218-
time_point = std::cmp::min(point, time_point);
219+
time_point = if !by_pass_retention_period_checking {
220+
std::cmp::min(point, time_point)
221+
} else {
222+
point
223+
};
219224
self.list_by_time_point(time_point).await
220225
}
221226
Some(NavigationPoint::SnapshotID(snapshot_id)) => {

0 commit comments

Comments
 (0)