Skip to content

Commit 730a84c

Browse files
committed
chore: cleanup
1 parent f5412ba commit 730a84c

File tree

6 files changed

+89
-83
lines changed

6 files changed

+89
-83
lines changed

src/query/service/src/interpreters/interpreter_table_drop.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,14 @@ impl Interpreter for DropTableInterpreter {
132132
// thus if we do not refresh the table instance, `truncate` will fail
133133
let latest = tbl.as_ref().refresh(self.ctx.as_ref()).await?;
134134
let maybe_fuse_table = FuseTable::try_from_table(latest.as_ref());
135-
// if target table if of type FuseTable, purge its historical data
135+
// if target table is of type FuseTable, purge its historical data
136136
// otherwise, plain truncate
137137
if let Ok(fuse_table) = maybe_fuse_table {
138138
fuse_table
139139
.do_truncate(
140140
self.ctx.clone(),
141141
&mut build_res.main_pipeline,
142-
TruncateMode::Purge,
142+
TruncateMode::DropAllPurge,
143143
)
144144
.await?
145145
} else {

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -718,9 +718,9 @@ impl Table for FuseTable {
718718
keep_last_snapshot: bool,
719719
dry_run: bool,
720720
) -> Result<Option<Vec<String>>> {
721-
let by_pass_retention_period_check = false;
721+
let by_pass_retention_check_for_nav_by_time_point = false;
722722
match self
723-
.navigate_for_purge(&ctx, instant, by_pass_retention_period_check)
723+
.navigate_for_purge(&ctx, instant, by_pass_retention_check_for_nav_by_time_point)
724724
.await
725725
{
726726
Ok((table, files)) => {

src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot;
2323
use crate::operations::common::ConflictResolveContext;
2424

2525
#[async_trait::async_trait]
26-
pub trait SnapshotGenerator {
26+
pub trait SnapshotGenerator: Sync {
2727
/// Convert to `Any`, to enable dynamic casting.
2828
fn as_any(&self) -> &dyn Any;
2929

src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub enum TruncateMode {
3030
// Delete the data, used for delete operation.
3131
Delete,
3232
// Truncate and purge the historical data.
33-
Purge,
33+
DropAllPurge,
3434
}
3535

3636
#[derive(Clone)]

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 69 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::time::Instant;
1919

2020
use backoff::backoff::Backoff;
2121
use backoff::ExponentialBackoff;
22+
use chrono::DateTime;
23+
use chrono::Utc;
2224
use databend_common_catalog::lock::Lock;
2325
use databend_common_catalog::table::NavigationPoint;
2426
use databend_common_catalog::table::Table;
@@ -121,7 +123,7 @@ where F: SnapshotGenerator + Send + 'static
121123
prev_snapshot_id: Option<SnapshotId>,
122124
deduplicated_label: Option<String>,
123125
) -> Result<ProcessorPtr> {
124-
let purge = Self::do_purge(table, &snapshot_gen);
126+
let purge = Self::need_purge(table, &snapshot_gen);
125127
Ok(ProcessorPtr::create(Box::new(CommitSink {
126128
state: State::None,
127129
ctx,
@@ -189,18 +191,62 @@ where F: SnapshotGenerator + Send + 'static
189191
Ok(Event::Async)
190192
}
191193

192-
fn do_purge(table: &FuseTable, snapshot_gen: &F) -> bool {
194+
fn need_purge(table: &FuseTable, snapshot_gen: &F) -> bool {
193195
if table.transient() {
194196
return true;
195197
}
196198

197199
snapshot_gen
198200
.as_any()
199201
.downcast_ref::<TruncateGenerator>()
200-
.is_some_and(|gen| matches!(gen.mode(), TruncateMode::Purge))
202+
.is_some_and(|gen| matches!(gen.mode(), TruncateMode::DropAllPurge))
201203
}
202204

203-
fn do_truncate(&self) -> bool {
205+
async fn purge_table(&self, tbl: &FuseTable) -> Result<()> {
206+
let snapshot_files = tbl.list_snapshot_files().await?;
207+
let keep_last_snapshot = true;
208+
let dry_run = false;
209+
tbl.do_purge(&self.ctx, snapshot_files, None, keep_last_snapshot, dry_run)
210+
.await?;
211+
Ok(())
212+
}
213+
214+
async fn purge_transient_table(
215+
&self,
216+
tbl: &FuseTable,
217+
snapshot_timestamp: DateTime<Utc>,
218+
) -> Result<()> {
219+
let transient_data_retention_minutes = self
220+
.ctx
221+
.get_settings()
222+
.get_transient_data_retention_time_in_minutes()?;
223+
224+
let instant =
225+
snapshot_timestamp - chrono::Duration::minutes(transient_data_retention_minutes as i64);
226+
227+
let by_pass_retention_period_checking = true;
228+
let keep_last_snapshot = true;
229+
let dry_run = false;
230+
let (table, candidate_snapshots) = tbl
231+
.navigate_for_purge(
232+
&self.ctx,
233+
Some(NavigationPoint::TimePoint(instant)),
234+
by_pass_retention_period_checking,
235+
)
236+
.await?;
237+
table
238+
.do_purge(
239+
&self.ctx,
240+
candidate_snapshots,
241+
None,
242+
keep_last_snapshot,
243+
dry_run,
244+
)
245+
.await?;
246+
Ok(())
247+
}
248+
249+
fn need_truncate(&self) -> bool {
204250
self.snapshot_gen
205251
.as_any()
206252
.downcast_ref::<TruncateGenerator>()
@@ -368,7 +414,7 @@ where F: SnapshotGenerator + Send + 'static
368414
.await
369415
{
370416
Ok(_) => {
371-
if self.do_truncate() {
417+
if self.need_truncate() {
372418
catalog
373419
.truncate_table(&table_info, TruncateTableReq {
374420
table_id: table_info.ident.table_id,
@@ -382,74 +428,30 @@ where F: SnapshotGenerator + Send + 'static
382428
let latest = self.table.refresh(self.ctx.as_ref()).await?;
383429
let tbl = FuseTable::try_from_table(latest.as_ref())?;
384430

385-
let transient_data_retention_minutes = self
386-
.ctx
387-
.get_settings()
388-
.get_transient_data_retention_time_in_minutes()?;
389-
390-
let instant = snapshot_timestamp
391-
- chrono::Duration::minutes(
392-
transient_data_retention_minutes as i64,
393-
);
394-
395-
let keep_last_snapshot = true;
396-
let dry_run = false;
397-
let by_pass_retention_period_checking = true;
398-
399431
info!(
400432
"purging historical data. (name{}, id {})",
401433
tbl.table_info.name, tbl.table_info.ident
402434
);
403-
// pure historical data.
404-
// errors ignored (since it can be picked up by subsequent purge actions)
405-
match tbl
406-
.navigate_for_purge(
407-
&self.ctx,
408-
Some(NavigationPoint::TimePoint(instant)),
409-
by_pass_retention_period_checking,
410-
)
411-
.await
412-
{
413-
Ok((table, files)) => {
414-
if let Err(e) = table
415-
.do_purge(
416-
&self.ctx,
417-
files,
418-
None,
419-
keep_last_snapshot,
420-
dry_run,
421-
)
422-
.await
423-
{
424-
// Errors of GC, if any, are ignored, since GC task can be picked up
425-
warn!("table GC failed (none permanent error) : {}", e);
426-
} else {
427-
info!("table GC done");
428-
}
429-
}
430-
Err(e) => {
431-
warn!(
432-
"table GC failed (none permanent error) : navigation error {:?}",
433-
e
434-
);
435-
}
436-
}
437435

438-
// let snapshot_files = tbl.list_snapshot_files().await?;
439-
440-
// if let Err(e) = tbl
441-
// .do_purge(
442-
// &self.ctx,
443-
// snapshot_files,
444-
// None,
445-
// keep_last_snapshot,
446-
// dry_run,
447-
// )
448-
// .await
449-
//{
450-
//} else {
451-
//}
436+
// purge table, swallow errors
437+
let res = if tbl.transient() {
438+
self.purge_transient_table(tbl, snapshot_timestamp).await
439+
} else {
440+
self.purge_table(tbl).await
441+
};
442+
443+
match res {
444+
Err(e) => warn!(
445+
"purge table (name{}, id {}) failed (non-permanent error). the error : {}",
446+
tbl.table_info.name, tbl.table_info.ident, e
447+
),
448+
Ok(()) => info!(
449+
"purge table done. (name{}, id {})",
450+
tbl.table_info.name, tbl.table_info.ident
451+
),
452+
}
452453
}
454+
453455
metrics_inc_commit_mutation_success();
454456
{
455457
let elapsed_time = self.start_time.elapsed().as_millis();

src/query/storages/fuse/src/operations/navigate.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl FuseTable {
199199
&self,
200200
ctx: &Arc<dyn TableContext>,
201201
instant: Option<NavigationPoint>,
202-
by_pass_retention_period_checking: bool,
202+
by_pass_retention_check_for_nav_by_time_point: bool,
203203
) -> Result<(Arc<FuseTable>, Vec<String>)> {
204204
let root_snapshot = if let Some(snapshot) = self.read_table_snapshot().await? {
205205
snapshot
@@ -212,26 +212,30 @@ impl FuseTable {
212212
assert!(root_snapshot.timestamp.is_some());
213213
let retention =
214214
Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64);
215-
let mut time_point = root_snapshot.timestamp.unwrap() - retention;
215+
let min_time_point = root_snapshot.timestamp.unwrap() - retention;
216216

217217
let (location, files) = match instant {
218218
Some(NavigationPoint::TimePoint(point)) => {
219-
time_point = if !by_pass_retention_period_checking {
220-
std::cmp::min(point, time_point)
221-
} else {
219+
let nav_time_point = if by_pass_retention_check_for_nav_by_time_point {
222220
point
221+
} else {
222+
std::cmp::max(point, min_time_point)
223223
};
224-
self.list_by_time_point(time_point).await
224+
self.list_by_time_point(nav_time_point).await
225225
}
226226
Some(NavigationPoint::SnapshotID(snapshot_id)) => {
227-
self.list_by_snapshot_id(snapshot_id.as_str(), time_point)
227+
self.list_by_snapshot_id(snapshot_id.as_str(), min_time_point)
228228
.await
229229
}
230-
Some(NavigationPoint::StreamInfo(info)) => self.list_by_stream(info, time_point).await,
231-
None => self.list_by_time_point(time_point).await,
230+
Some(NavigationPoint::StreamInfo(info)) => {
231+
self.list_by_stream(info, min_time_point).await
232+
}
233+
None => self.list_by_time_point(min_time_point).await,
232234
}?;
233235

234-
let table = self.navigate_to_time_point(location, time_point).await?;
236+
let table = self
237+
.navigate_to_time_point(location, min_time_point)
238+
.await?;
235239

236240
Ok((table, files))
237241
}

0 commit comments

Comments
 (0)