Skip to content

Commit 5506ec7

Browse files
authored
refactor(query): enable little vacuum after query finish (#15434)
* refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish * refactor(query): enable little vacuum after query finish
1 parent 8a75f9d commit 5506ec7

File tree

20 files changed

+304
-43
lines changed

20 files changed

+304
-43
lines changed

src/query/ee/src/storages/fuse/operations/handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl VacuumHandler for RealVacuumHandler {
5757
temporary_dir: String,
5858
retain: Option<Duration>,
5959
vacuum_limit: Option<usize>,
60-
) -> Result<Vec<String>> {
60+
) -> Result<usize> {
6161
do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit).await
6262
}
6363
}

src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs

Lines changed: 156 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,50 +13,187 @@
1313
// limitations under the License.
1414

1515
use std::time::Duration;
16+
use std::time::Instant;
1617
use std::time::SystemTime;
1718
use std::time::UNIX_EPOCH;
1819

1920
use databend_common_exception::Result;
2021
use databend_common_storage::DataOperator;
22+
use futures_util::stream;
2123
use futures_util::TryStreamExt;
24+
use log::info;
25+
use opendal::Entry;
26+
use opendal::EntryMode;
2227
use opendal::Metakey;
2328

2429
#[async_backtrace::framed]
2530
pub async fn do_vacuum_temporary_files(
2631
temporary_dir: String,
2732
retain: Option<Duration>,
2833
limit: Option<usize>,
29-
) -> Result<Vec<String>> {
34+
) -> Result<usize> {
35+
let limit = limit.unwrap_or(usize::MAX);
36+
let expire_time = retain
37+
.map(|x| x.as_millis())
38+
.unwrap_or(1000 * 60 * 60 * 24 * 3) as i64;
39+
let timestamp = SystemTime::now()
40+
.duration_since(UNIX_EPOCH)
41+
.unwrap()
42+
.as_millis() as i64;
43+
3044
let operator = DataOperator::instance().operator();
3145

46+
let temporary_dir = format!("{}/", temporary_dir);
47+
3248
let mut ds = operator
3349
.lister_with(&temporary_dir)
34-
.recursive(true)
35-
.metakey(Metakey::LastModified)
50+
.metakey(Metakey::Mode | Metakey::LastModified)
3651
.await?;
3752

38-
let limit = limit.unwrap_or(usize::MAX);
39-
let expire_time = retain.map(|x| x.as_millis()).unwrap_or(60 * 60 * 24 * 3) as i64;
40-
let timestamp = SystemTime::now()
41-
.duration_since(UNIX_EPOCH)
42-
.unwrap()
43-
.as_millis() as i64;
53+
let mut removed_temp_files = 0;
4454

45-
let mut remove_temp_files_name = Vec::new();
46-
while let Some(de) = ds.try_next().await? {
47-
let meta = de.metadata();
55+
while removed_temp_files < limit {
56+
let instant = Instant::now();
57+
let mut end_of_stream = true;
58+
let mut remove_temp_files_path = Vec::with_capacity(1000);
4859

49-
if let Some(modified) = meta.last_modified() {
50-
if timestamp - modified.timestamp_millis() >= expire_time {
51-
operator.delete(de.path()).await?;
52-
remove_temp_files_name.push(de.name().to_string());
60+
while let Some(de) = ds.try_next().await? {
61+
let meta = de.metadata();
62+
63+
match meta.mode() {
64+
EntryMode::DIR => {
65+
let life_mills =
66+
match operator.is_exist(&format!("{}finished", de.path())).await? {
67+
true => 0,
68+
false => expire_time,
69+
};
70+
71+
vacuum_finished_query(
72+
&mut removed_temp_files,
73+
&de,
74+
limit,
75+
timestamp,
76+
life_mills,
77+
)
78+
.await?;
79+
80+
if removed_temp_files >= limit {
81+
end_of_stream = false;
82+
break;
83+
}
84+
}
85+
EntryMode::FILE => {
86+
if let Some(modified) = meta.last_modified() {
87+
if timestamp - modified.timestamp_millis() >= expire_time {
88+
removed_temp_files += 1;
89+
remove_temp_files_path.push(de.path().to_string());
90+
91+
if removed_temp_files >= limit || remove_temp_files_path.len() >= 1000 {
92+
end_of_stream = false;
93+
break;
94+
}
95+
}
96+
}
97+
}
98+
EntryMode::Unknown => unreachable!(),
5399
}
100+
}
101+
102+
if !remove_temp_files_path.is_empty() {
103+
let cur_removed = remove_temp_files_path.len();
104+
operator
105+
.remove_via(stream::iter(remove_temp_files_path))
106+
.await?;
107+
108+
info!(
109+
"vacuum removed {} temp files in {:?}(elapsed: {:?})",
110+
cur_removed,
111+
temporary_dir,
112+
instant.elapsed()
113+
);
114+
}
115+
116+
if end_of_stream {
117+
break;
118+
}
119+
}
54120

55-
if remove_temp_files_name.len() >= limit {
56-
break;
121+
Ok(removed_temp_files)
122+
}
123+
124+
async fn vacuum_finished_query(
125+
removed_temp_files: &mut usize,
126+
de: &Entry,
127+
limit: usize,
128+
timestamp: i64,
129+
life_mills: i64,
130+
) -> Result<()> {
131+
let operator = DataOperator::instance().operator();
132+
133+
let mut all_files_removed = true;
134+
let mut ds = operator
135+
.lister_with(de.path())
136+
.metakey(Metakey::Mode | Metakey::LastModified)
137+
.await?;
138+
139+
while *removed_temp_files < limit {
140+
let instant = Instant::now();
141+
142+
let mut end_of_stream = true;
143+
let mut all_each_files_removed = true;
144+
let mut remove_temp_files_path = Vec::with_capacity(1001);
145+
146+
while let Some(de) = ds.try_next().await? {
147+
let meta = de.metadata();
148+
if meta.is_file() {
149+
if de.name() == "finished" {
150+
continue;
151+
}
152+
153+
if let Some(modified) = meta.last_modified() {
154+
if timestamp - modified.timestamp_millis() >= life_mills {
155+
*removed_temp_files += 1;
156+
remove_temp_files_path.push(de.path().to_string());
157+
158+
if *removed_temp_files >= limit || remove_temp_files_path.len() >= 1000 {
159+
end_of_stream = false;
160+
break;
161+
}
162+
163+
continue;
164+
}
165+
}
57166
}
167+
168+
all_each_files_removed = false;
58169
}
170+
171+
all_files_removed &= all_each_files_removed;
172+
173+
if !remove_temp_files_path.is_empty() {
174+
let cur_removed = remove_temp_files_path.len();
175+
176+
operator
177+
.remove_via(stream::iter(remove_temp_files_path))
178+
.await?;
179+
180+
info!(
181+
"vacuum removed {} temp files in {:?}(elapsed: {:?})",
182+
cur_removed,
183+
de.path(),
184+
instant.elapsed()
185+
);
186+
}
187+
188+
if end_of_stream {
189+
break;
190+
}
191+
}
192+
193+
if all_files_removed {
194+
operator.delete(&format!("{}finished", de.path())).await?;
195+
operator.delete(de.path()).await?;
59196
}
60197

61-
Ok(remove_temp_files_name)
198+
Ok(())
62199
}

src/query/ee/tests/it/storages/fuse/operations/vacuum.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
117117
operator.write("test_dir/test2", vec![1, 2]).await?;
118118
operator.write("test_dir/test3", vec![1, 2]).await?;
119119

120-
assert_eq!(3, operator.list("test_dir/").await?.len());
120+
assert_eq!(
121+
3,
122+
operator.list_with("test_dir/").recursive(true).await?.len()
123+
);
121124

122125
tokio::time::sleep(Duration::from_secs(2)).await;
123126
do_vacuum_temporary_files(
@@ -129,8 +132,23 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
129132

130133
assert_eq!(2, operator.list("test_dir/").await?.len());
131134

132-
do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), None).await?;
133-
assert_eq!(0, operator.list("test_dir/").await?.len());
135+
operator.write("test_dir/test4/test4", vec![1, 2]).await?;
136+
operator.write("test_dir/test5/test5", vec![1, 2]).await?;
137+
operator
138+
.write("test_dir/test5/finished", vec![1, 2])
139+
.await?;
140+
141+
do_vacuum_temporary_files(
142+
"test_dir/".to_string(),
143+
Some(Duration::from_secs(2)),
144+
Some(2),
145+
)
146+
.await?;
147+
assert_eq!(operator.list("test_dir/").await?.len(), 2);
148+
149+
tokio::time::sleep(Duration::from_secs(3)).await;
150+
do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(3)), None).await?;
151+
assert!(operator.list_with("test_dir/").await?.is_empty());
134152

135153
Ok(())
136154
}

src/query/ee_features/vacuum_handler/src/vacuum_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub trait VacuumHandler: Sync + Send {
4747
temporary_dir: String,
4848
retain: Option<Duration>,
4949
vacuum_limit: Option<usize>,
50-
) -> Result<Vec<String>>;
50+
) -> Result<usize>;
5151
}
5252

5353
pub struct VacuumHandlerWrapper {
@@ -89,7 +89,7 @@ impl VacuumHandlerWrapper {
8989
temporary_dir: String,
9090
retain: Option<Duration>,
9191
vacuum_limit: Option<usize>,
92-
) -> Result<Vec<String>> {
92+
) -> Result<usize> {
9393
self.handler
9494
.do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit)
9595
.await

src/query/pipeline/core/src/pipeline.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,9 @@ impl Drop for Pipeline {
533533
}
534534
}
535535

536-
pub fn query_spill_prefix(tenant: &str) -> String {
537-
format!("_query_spill/{}", tenant)
536+
pub fn query_spill_prefix(tenant: &str, query_id: &str) -> String {
537+
match query_id.is_empty() {
538+
true => format!("_query_spill/{}", tenant),
539+
false => format!("_query_spill/{}/{}", tenant, query_id),
540+
}
538541
}

src/query/service/src/interpreters/hook/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
pub(crate) mod compact_hook;
1616
pub(crate) mod refresh_hook;
17+
pub(crate) mod vacuum_hook;
1718

1819
#[allow(clippy::module_inception)]
1920
mod hook;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::time::Duration;
17+
18+
use databend_common_base::runtime::GlobalIORuntime;
19+
use databend_common_catalog::table_context::TableContext;
20+
use databend_common_exception::Result;
21+
use databend_common_license::license::Feature::Vacuum;
22+
use databend_common_license::license_manager::get_license_manager;
23+
use databend_common_pipeline_core::query_spill_prefix;
24+
use databend_common_storage::DataOperator;
25+
use databend_enterprise_vacuum_handler::get_vacuum_handler;
26+
27+
use crate::sessions::QueryContext;
28+
29+
pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
30+
let tenant = query_ctx.get_tenant();
31+
let settings = query_ctx.get_settings();
32+
let spill_prefix = query_spill_prefix(tenant.tenant_name(), &query_ctx.get_id());
33+
let license_manager = get_license_manager();
34+
35+
if license_manager
36+
.manager
37+
.check_enterprise_enabled(query_ctx.get_license_key(), Vacuum)
38+
.is_ok()
39+
{
40+
let handler = get_vacuum_handler();
41+
42+
let _ = GlobalIORuntime::instance().block_on(async move {
43+
let vacuum_limit = match settings.get_max_vacuum_temp_files_after_query()? {
44+
0 => None,
45+
v => Some(v as usize),
46+
};
47+
let removed_files = handler
48+
.do_vacuum_temporary_files(
49+
spill_prefix.clone(),
50+
Some(Duration::from_secs(0)),
51+
vacuum_limit,
52+
)
53+
.await;
54+
55+
if (removed_files.is_ok() && vacuum_limit.is_none())
56+
&& !matches!(removed_files, Ok(res) if Some(res) != vacuum_limit)
57+
{
58+
let op = DataOperator::instance().operator();
59+
op.create_dir(&format!("{}/", spill_prefix)).await?;
60+
op.write(&format!("{}/finished", spill_prefix), vec![])
61+
.await?;
62+
}
63+
64+
Ok(())
65+
});
66+
}
67+
68+
Ok(())
69+
}

src/query/service/src/interpreters/interpreter.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_sql::Planner;
3232
use log::error;
3333
use log::info;
3434

35+
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
3536
use crate::interpreters::interpreter_txn_commit::CommitInterpreter;
3637
use crate::interpreters::InterpreterMetrics;
3738
use crate::interpreters::InterpreterQueryLog;
@@ -132,6 +133,8 @@ pub trait Interpreter: Sync + Send {
132133
}
133134
}
134135

136+
hook_vacuum_temp_files(&query_ctx)?;
137+
135138
let err_opt = match may_error {
136139
Ok(_) => None,
137140
Err(e) => Some(e.clone()),

0 commit comments

Comments
 (0)