Skip to content

Commit 6981624

Browse files
lichuangdantengsky
andauthored
feat: optimize vacuum drop table, execute in parallel (#15478)
* feat: optimize vacuum drop table, execute in parallel * feat: optimize vacuum drop table, execute in parallel * feat: optimize vacuum drop table, execute in parallel * feat: optimize vacuum drop table, execute in parallel * feat: optimize vacuum drop table, execute in parallel * Update src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs --------- Co-authored-by: dantengsky <dantengsky@gmail.com>
1 parent 2d7beb4 commit 6981624

File tree

5 files changed

+114
-80
lines changed

5 files changed

+114
-80
lines changed

src/query/ee/src/storages/fuse/operations/handler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ impl VacuumHandler for RealVacuumHandler {
4646

4747
async fn do_vacuum_drop_tables(
4848
&self,
49+
threads_nums: usize,
4950
tables: Vec<Arc<dyn Table>>,
5051
dry_run_limit: Option<usize>,
5152
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
52-
do_vacuum_drop_tables(tables, dry_run_limit).await
53+
do_vacuum_drop_tables(threads_nums, tables, dry_run_limit).await
5354
}
5455

5556
async fn do_vacuum_temporary_files(

src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs

Lines changed: 98 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::sync::Arc;
1616
use std::time::Instant;
1717

18+
use databend_common_base::runtime::execute_futures_in_parallel;
1819
use databend_common_catalog::table::Table;
1920
use databend_common_exception::Result;
2021
use databend_common_meta_app::schema::TableInfo;
@@ -28,108 +29,131 @@ use opendal::Operator;
2829

2930
#[async_backtrace::framed]
3031
pub async fn do_vacuum_drop_table(
31-
table_info: &TableInfo,
32-
operator: &Operator,
32+
tables: Vec<(TableInfo, Operator)>,
3333
dry_run_limit: Option<usize>,
3434
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
35-
let dir = format!("{}/", FuseTable::parse_storage_prefix(table_info)?);
35+
let mut list_files = vec![];
36+
for (table_info, operator) in tables {
37+
let dir = format!("{}/", FuseTable::parse_storage_prefix(&table_info)?);
3638

37-
info!(
38-
"vacuum drop table {:?} dir {:?}, is_external_table:{:?}",
39-
table_info.name,
40-
dir,
41-
table_info.meta.storage_params.is_some()
42-
);
39+
info!(
40+
"vacuum drop table {:?} dir {:?}, is_external_table:{:?}",
41+
table_info.name,
42+
dir,
43+
table_info.meta.storage_params.is_some()
44+
);
4345

44-
let start = Instant::now();
46+
let start = Instant::now();
4547

46-
let ret = match dry_run_limit {
47-
None => {
48-
operator.remove_all(&dir).await?;
49-
Ok(None)
50-
}
51-
Some(dry_run_limit) => {
52-
let mut ds = operator
53-
.lister_with(&dir)
54-
.recursive(true)
55-
.metakey(Metakey::Mode)
56-
.metakey(Metakey::ContentLength)
57-
.await?;
58-
let mut list_files = Vec::new();
59-
while let Some(de) = ds.try_next().await? {
60-
let meta = de.metadata();
61-
if EntryMode::FILE == meta.mode() {
62-
list_files.push((
63-
table_info.name.clone(),
64-
de.name().to_string(),
65-
meta.content_length(),
66-
));
67-
if list_files.len() >= dry_run_limit {
68-
break;
48+
match dry_run_limit {
49+
None => {
50+
operator.remove_all(&dir).await?;
51+
}
52+
Some(dry_run_limit) => {
53+
let mut ds = operator
54+
.lister_with(&dir)
55+
.recursive(true)
56+
.metakey(Metakey::Mode)
57+
.metakey(Metakey::ContentLength)
58+
.await?;
59+
60+
while let Some(de) = ds.try_next().await? {
61+
let meta = de.metadata();
62+
if EntryMode::FILE == meta.mode() {
63+
list_files.push((
64+
table_info.name.clone(),
65+
de.name().to_string(),
66+
meta.content_length(),
67+
));
68+
if list_files.len() >= dry_run_limit {
69+
break;
70+
}
6971
}
7072
}
7173
}
74+
};
7275

73-
Ok(Some(list_files))
74-
}
75-
};
76-
77-
info!(
78-
"vacuum drop table {:?} dir {:?}, cost:{} sec",
79-
table_info.name,
80-
dir,
81-
start.elapsed().as_secs()
82-
);
83-
ret
76+
info!(
77+
"vacuum drop table {:?} dir {:?}, cost:{} sec",
78+
table_info.name,
79+
dir,
80+
start.elapsed().as_secs()
81+
);
82+
}
83+
Ok(if dry_run_limit.is_some() {
84+
Some(list_files)
85+
} else {
86+
None
87+
})
8488
}
8589

8690
#[async_backtrace::framed]
8791
pub async fn do_vacuum_drop_tables(
92+
threads_nums: usize,
8893
tables: Vec<Arc<dyn Table>>,
8994
dry_run_limit: Option<usize>,
9095
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
9196
let start = Instant::now();
9297
let tables_len = tables.len();
9398
info!("do_vacuum_drop_tables {} tables", tables_len);
94-
let mut list_files = Vec::new();
95-
let mut left_limit = dry_run_limit;
99+
100+
let batch_size = (tables_len / threads_nums).min(50).max(1);
101+
102+
let mut table_vecs = Vec::with_capacity(tables.len());
96103
for table in tables {
97-
// only operate fuse table
98-
let ret = if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
99-
let table_info = table.get_table_info();
100-
let operator = fuse_table.get_operator_ref();
101-
do_vacuum_drop_table(table_info, operator, left_limit).await?
102-
} else {
103-
info!(
104-
"ignore table {}, which is not of FUSE engine. Table engine {}",
105-
table.get_table_info().name,
106-
table.engine()
107-
);
108-
continue;
109-
};
110-
if let Some(ret) = ret {
111-
list_files.extend(ret);
112-
if list_files.len() >= dry_run_limit.unwrap() {
104+
let (table_info, operator) =
105+
if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
106+
(fuse_table.get_table_info(), fuse_table.get_operator())
107+
} else {
113108
info!(
114-
"do_vacuum_drop_tables {} tables, cost:{} sec",
115-
tables_len,
116-
start.elapsed().as_secs()
109+
"ignore table {}, which is not of FUSE engine. Table engine {}",
110+
table.get_table_info().name,
111+
table.engine()
117112
);
118-
return Ok(Some(list_files));
119-
} else {
120-
left_limit = Some(dry_run_limit.unwrap() - list_files.len());
113+
continue;
114+
};
115+
116+
table_vecs.push((table_info.clone(), operator));
117+
}
118+
119+
let result = if batch_size >= table_vecs.len() {
120+
do_vacuum_drop_table(table_vecs, dry_run_limit).await?
121+
} else {
122+
let mut chunks = table_vecs.chunks(batch_size);
123+
let dry_run_limit = dry_run_limit
124+
.map(|dry_run_limit| (dry_run_limit / threads_nums).min(dry_run_limit).max(1));
125+
let tasks = std::iter::from_fn(move || {
126+
chunks
127+
.next()
128+
.map(|tables| do_vacuum_drop_table(tables.to_vec(), dry_run_limit))
129+
});
130+
131+
let result = execute_futures_in_parallel(
132+
tasks,
133+
threads_nums,
134+
threads_nums * 2,
135+
"batch-vacuum-drop-tables-worker".to_owned(),
136+
)
137+
.await?;
138+
if dry_run_limit.is_some() {
139+
let mut ret_files = vec![];
140+
for file in result {
141+
// return error if any errors happens during `do_vacuum_drop_table`
142+
if let Some(files) = file? {
143+
ret_files.extend(files);
144+
}
121145
}
146+
Some(ret_files)
147+
} else {
148+
None
122149
}
123-
}
150+
};
151+
124152
info!(
125153
"do_vacuum_drop_tables {} tables, cost:{} sec",
126154
tables_len,
127155
start.elapsed().as_secs()
128156
);
129157

130-
Ok(if dry_run_limit.is_some() {
131-
Some(list_files)
132-
} else {
133-
None
134-
})
158+
Ok(result)
135159
}

src/query/ee/tests/it/storages/fuse/operations/vacuum.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_meta_app::schema::TableInfo;
2121
use databend_common_meta_app::schema::TableMeta;
2222
use databend_common_meta_app::storage::StorageParams;
2323
use databend_common_storage::DataOperator;
24+
use databend_common_storages_fuse::TableContext;
2425
use databend_enterprise_query::storages::fuse::do_vacuum_drop_tables;
2526
use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::do_vacuum_drop_table;
2627
use databend_enterprise_query::storages::fuse::operations::vacuum_temporary_files::do_vacuum_temporary_files;
@@ -69,10 +70,12 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {
6970
let tbl = fixture.default_table_name();
7071
let qry = format!("drop table {}.{}", db, tbl);
7172
fixture.execute_command(&qry).await?;
73+
let ctx = fixture.new_query_ctx().await?;
74+
let threads_nums = ctx.get_settings().get_max_threads()? as usize;
7275

7376
// verify dry run never delete files
7477
{
75-
do_vacuum_drop_tables(vec![table.clone()], Some(100)).await?;
78+
do_vacuum_drop_tables(threads_nums, vec![table.clone()], Some(100)).await?;
7679
check_data_dir(
7780
&fixture,
7881
"test_fuse_do_vacuum_drop_table: verify generate files",
@@ -88,7 +91,7 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {
8891
}
8992

9093
{
91-
do_vacuum_drop_tables(vec![table], None).await?;
94+
do_vacuum_drop_tables(threads_nums, vec![table], None).await?;
9295

9396
// after vacuum drop tables, verify the files number
9497
check_data_dir(
@@ -243,7 +246,8 @@ async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> {
243246
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
244247
let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();
245248

246-
let result = do_vacuum_drop_table(&table_info, &operator, None).await;
249+
let tables = vec![(table_info, operator)];
250+
let result = do_vacuum_drop_table(tables, None).await;
247251
assert!(result.is_err());
248252

249253
// verify that accessor.delete() was called
@@ -271,7 +275,8 @@ async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> {
271275
let accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
272276
let operator = OperatorBuilder::new(accessor.clone()).finish();
273277

274-
let result = do_vacuum_drop_table(&table_info, &operator, None).await;
278+
let tables = vec![(table_info, operator)];
279+
let result = do_vacuum_drop_table(tables, None).await;
275280
assert!(result.is_err());
276281

277282
// verify that accessor.delete() was called

src/query/ee_features/vacuum_handler/src/vacuum_handler.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub trait VacuumHandler: Sync + Send {
3838

3939
async fn do_vacuum_drop_tables(
4040
&self,
41+
threads_nums: usize,
4142
tables: Vec<Arc<dyn Table>>,
4243
dry_run_limit: Option<usize>,
4344
) -> Result<Option<Vec<VacuumDropFileInfo>>>;
@@ -75,11 +76,12 @@ impl VacuumHandlerWrapper {
7576
#[async_backtrace::framed]
7677
pub async fn do_vacuum_drop_tables(
7778
&self,
79+
threads_nums: usize,
7880
tables: Vec<Arc<dyn Table>>,
7981
dry_run_limit: Option<usize>,
8082
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
8183
self.handler
82-
.do_vacuum_drop_tables(tables, dry_run_limit)
84+
.do_vacuum_drop_tables(threads_nums, tables, dry_run_limit)
8385
.await
8486
}
8587

src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,10 @@ impl Interpreter for VacuumDropTablesInterpreter {
158158
.collect::<Vec<_>>();
159159

160160
let handler = get_vacuum_handler();
161+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
161162
let files_opt = handler
162163
.do_vacuum_drop_tables(
164+
threads_nums,
163165
tables,
164166
if self.plan.option.dry_run.is_some() {
165167
Some(DRY_RUN_LIMIT)

0 commit comments

Comments
 (0)