Skip to content

Commit 8c1a25f

Browse files
committed
fix: purge table statistic files
1 parent 094f154 commit 8c1a25f

File tree

5 files changed

+121
-10
lines changed

5 files changed

+121
-10
lines changed

src/query/service/tests/it/storages/fuse/operations/optimize.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,33 @@ async fn test_fuse_snapshot_optimize_statistic() -> Result<()> {
4848
.await
4949
}
5050

51+
#[tokio::test]
52+
async fn test_fuse_snapshot_optimize_statistic_purge() -> Result<()> {
53+
let fixture = TestFixture::new().await;
54+
let db = fixture.default_db_name();
55+
let tbl = fixture.default_table_name();
56+
let case_name = "optimize_statistic_purge";
57+
do_insertions(&fixture).await?;
58+
59+
// optimize statistics twice
60+
for i in 0..1 {
61+
let qry = format!("optimize table {}.{} statistic", db, tbl);
62+
63+
let ctx = fixture.ctx();
64+
execute_command(ctx, &qry).await?;
65+
66+
check_data_dir(&fixture, case_name, 3, 1 + i, 2, 2, 2, Some(())).await?;
67+
}
68+
69+
// After compact, all the count will become 1
70+
let qry = format!("optimize table {}.{} all", db, tbl);
71+
execute_command(fixture.ctx().clone(), &qry).await?;
72+
73+
check_data_dir(&fixture, case_name, 1, 1, 1, 1, 1, Some(())).await?;
74+
75+
Ok(())
76+
}
77+
5178
#[tokio::test]
5279
async fn test_fuse_snapshot_optimize_all() -> Result<()> {
5380
do_purge_test("explicit pure", "all", 1, 0, 1, 1, 1, None).await

src/query/service/tests/it/storages/fuse/table_test_fixture.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,12 @@ pub async fn check_data_dir(
483483
let prefix_block = FUSE_TBL_BLOCK_PREFIX;
484484
let prefix_index = FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
485485
let prefix_last_snapshot_hint = FUSE_TBL_LAST_SNAPSHOT_HINT;
486+
println!("root: {}", root);
486487
for entry in WalkDir::new(root) {
487488
let entry = entry.unwrap();
488489
if entry.file_type().is_file() {
489490
let (_, entry_path) = entry.path().to_str().unwrap().split_at(root.len());
491+
println!("entry: {}", entry_path);
490492
// trim the leading prefix, e.g. "/db_id/table_id/"
491493
let path = entry_path.split('/').skip(3).collect::<Vec<_>>();
492494
let path = path[0];
@@ -499,6 +501,7 @@ pub async fn check_data_dir(
499501
} else if path.starts_with(prefix_index) {
500502
i_count += 1;
501503
} else if path.starts_with(prefix_snapshot_statistics) {
504+
println!("ts file:{}", entry_path);
502505
ts_count += 1;
503506
} else if path.starts_with(prefix_last_snapshot_hint) && check_last_snapshot.is_some() {
504507
let content = fixture

src/query/storages/fuse/fuse/src/io/snapshots.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,21 @@ impl SnapshotsIO {
102102
.map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e)))
103103
}
104104

105+
// Read all the snapshots by the root file.
106+
// limit: read how many snapshot files
107+
// with_segment_locations: if true will get the segments of the snapshot
108+
pub async fn read_table_statistic_files(
109+
&self,
110+
root_ts_file: &str,
111+
limit: Option<usize>,
112+
) -> Result<Vec<String>> {
113+
// Get all file list.
114+
if let Some(prefix) = Self::get_s3_prefix_from_file(root_ts_file) {
115+
return self.get_files(&prefix, limit, Some(root_ts_file)).await;
116+
}
117+
Ok(vec![])
118+
}
119+
105120
// Read all the snapshots by the root file.
106121
// limit: read how many snapshot files
107122
// with_segment_locations: if true will get the segments of the snapshot
@@ -123,7 +138,7 @@ impl SnapshotsIO {
123138
let mut snapshot_files = vec![];
124139
let mut segment_locations = HashSet::new();
125140
if let Some(prefix) = Self::get_s3_prefix_from_file(&root_snapshot_file) {
126-
snapshot_files = self.get_files(&prefix, limit).await?;
141+
snapshot_files = self.get_files(&prefix, limit, None).await?;
127142
}
128143

129144
// 1. Get all the snapshot by chunks.
@@ -199,14 +214,22 @@ impl SnapshotsIO {
199214
Ok((snapshot_chain, segment_locations))
200215
}
201216

202-
async fn get_files(&self, prefix: &str, limit: Option<usize>) -> Result<Vec<String>> {
217+
async fn get_files(
218+
&self,
219+
prefix: &str,
220+
limit: Option<usize>,
221+
exclude_file: Option<&str>,
222+
) -> Result<Vec<String>> {
203223
let data_accessor = self.operator.clone();
204224

205225
let mut file_list = vec![];
206226
let mut ds = data_accessor.object(prefix).list().await?;
207227
while let Some(de) = ds.try_next().await? {
208228
match de.mode().await? {
209229
ObjectMode::FILE => {
230+
if exclude_file.is_some() && Some(de.path()) == exclude_file {
231+
continue;
232+
}
210233
let location = de.path().to_string();
211234
let modified = de.last_modified().await?;
212235
file_list.push((location, modified));

src/query/storages/fuse/fuse/src/operations/gc.rs

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,20 @@ impl FuseTable {
5858
// 1. Root snapshot.
5959
let mut segments_referenced_by_root = HashSet::new();
6060
let mut locations_referenced_by_root = Default::default();
61-
let (root_snapshot_id, root_snapshot_ts) = if let Some(root_snapshot) = snapshot_opt {
62-
let segments = root_snapshot.segments.clone();
63-
locations_referenced_by_root = self.get_block_locations(ctx.clone(), &segments).await?;
64-
segments_referenced_by_root = HashSet::from_iter(segments);
65-
(root_snapshot.snapshot_id, root_snapshot.timestamp)
66-
} else {
67-
(SnapshotId::new_v4(), None)
68-
};
61+
let (root_snapshot_id, root_snapshot_ts, root_ts_location_opt) =
62+
if let Some(ref root_snapshot) = snapshot_opt {
63+
let segments = root_snapshot.segments.clone();
64+
locations_referenced_by_root =
65+
self.get_block_locations(ctx.clone(), &segments).await?;
66+
segments_referenced_by_root = HashSet::from_iter(segments);
67+
(
68+
root_snapshot.snapshot_id,
69+
root_snapshot.timestamp,
70+
root_snapshot.table_statistics_location.clone(),
71+
)
72+
} else {
73+
(SnapshotId::new_v4(), None, None)
74+
};
6975

7076
// 2. Get all snapshot(including root snapshot).
7177
let mut all_snapshot_lites = vec![];
@@ -108,6 +114,7 @@ impl FuseTable {
108114
// 3. Find.
109115
let mut snapshots_to_be_purged = HashSet::new();
110116
let mut segments_to_be_purged = HashSet::new();
117+
let ts_to_be_purged: Vec<String> = vec![];
111118

112119
// 3.1 Find all the snapshots need to be deleted.
113120
{
@@ -131,6 +138,29 @@ impl FuseTable {
131138
}
132139
}
133140

141+
// 3.3 Find all the table statistic files need to be deleted
142+
{
143+
if let Some(root_ts_location) = root_ts_location_opt {
144+
let start = Instant::now();
145+
let snapshots_io = SnapshotsIO::create(
146+
ctx.clone(),
147+
self.operator.clone(),
148+
self.snapshot_format_version().await?,
149+
);
150+
let ts_to_be_purged = snapshots_io
151+
.read_table_statistic_files(&root_ts_location, None)
152+
.await?;
153+
let status_ts_scan_count = ts_to_be_purged.len();
154+
let status_ts_scan_cost = start.elapsed().as_secs();
155+
let status = format!(
156+
"gc: scan table statistic files:{} takes:{} sec.",
157+
status_ts_scan_count, status_ts_scan_cost,
158+
);
159+
self.data_metrics.set_status(&status);
160+
info!(status);
161+
}
162+
}
163+
134164
let chunk_size = ctx.get_settings().get_max_storage_io_requests()? as usize;
135165

136166
// 4. Purge segments&blocks by chunk size
@@ -240,6 +270,33 @@ impl FuseTable {
240270
}
241271
}
242272

273+
// 6. Purge table statistic files
274+
{
275+
let mut status_purged_count = 0;
276+
let status_need_purged_count = ts_to_be_purged.len();
277+
let start = Instant::now();
278+
for chunk in ts_to_be_purged.chunks(chunk_size) {
279+
let mut ts_locations_to_be_purged = HashSet::new();
280+
for file in chunk {
281+
ts_locations_to_be_purged.insert(file.clone());
282+
}
283+
self.try_purge_location_files(ctx.clone(), ts_locations_to_be_purged)
284+
.await?;
285+
// Refresh status.
286+
{
287+
status_purged_count += chunk.len();
288+
let status = format!(
289+
"gc: table statistic files need to be purged:{}, have purged:{}, take:{} sec",
290+
status_need_purged_count,
291+
status_purged_count,
292+
start.elapsed().as_secs()
293+
);
294+
self.data_metrics.set_status(&status);
295+
info!(status);
296+
}
297+
}
298+
}
299+
243300
Ok(())
244301
}
245302

src/query/storages/fuse/fuse/src/operations/statistic.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ impl FuseTable {
9292
table_statistics.format_version(),
9393
)?;
9494

95+
println!("ts location: {:?}", table_statistics);
9596
// 4. Save table statistics
9697
let mut new_snapshot = TableSnapshot::from_previous(&snapshot);
9798
new_snapshot.table_statistics_location = Some(table_statistics_location);

0 commit comments

Comments
 (0)