Skip to content

Commit 657cfdd

Browse files
authored
chore: tweak deletion batch size & log messages (#15005)
* adjust purge batch size * add explicit timing logs for list and purge
1 parent a892f0a commit 657cfdd

File tree

6 files changed

+61
-26
lines changed

6 files changed

+61
-26
lines changed

src/query/service/src/pipelines/builders/builder_on_finished.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use databend_common_pipeline_core::Pipeline;
2424
use databend_common_storages_fuse::io::Files;
2525
use databend_common_storages_stage::StageTable;
2626
use log::error;
27+
use log::info;
2728

2829
use crate::pipelines::PipelineBuilder;
2930
use crate::sessions::QueryContext;
@@ -66,16 +67,7 @@ impl PipelineBuilder {
6667
// 2. Try to purge copied files if purge option is true, if error will skip.
6768
// If a file is already copied(status with AlreadyCopied) we will try to purge them.
6869
if !is_active && copy_purge_option {
69-
let start = Instant::now();
7070
Self::try_purge_files(ctx.clone(), &stage_info, &files).await;
71-
72-
// Perf.
73-
{
74-
metrics_inc_copy_purge_files_counter(files.len() as u32);
75-
metrics_inc_copy_purge_files_cost_milliseconds(
76-
start.elapsed().as_millis() as u32,
77-
);
78-
}
7971
}
8072

8173
Ok(())
@@ -107,21 +99,17 @@ impl PipelineBuilder {
10799
}
108100
}
109101

110-
let start = Instant::now();
111102
Self::try_purge_files(ctx.clone(), &stage_info, &files).await;
112103

113-
// Perf.
114-
{
115-
metrics_inc_copy_purge_files_counter(files.len() as u32);
116-
metrics_inc_copy_purge_files_cost_milliseconds(start.elapsed().as_millis() as u32);
117-
}
118104
Ok(())
119105
}
120106

121107
#[async_backtrace::framed]
122108
pub async fn try_purge_files(ctx: Arc<QueryContext>, stage_info: &StageInfo, files: &[String]) {
109+
let start = Instant::now();
123110
let table_ctx: Arc<dyn TableContext> = ctx.clone();
124111
let op = StageTable::get_op(stage_info);
112+
125113
match op {
126114
Ok(op) => {
127115
let file_op = Files::create(table_ctx, op);
@@ -133,5 +121,18 @@ impl PipelineBuilder {
133121
error!("Failed to get stage table op, error: {}", e);
134122
}
135123
}
124+
125+
let elapsed = start.elapsed();
126+
info!(
127+
"purged files: number {}, time used {:?} ",
128+
files.len(),
129+
elapsed
130+
);
131+
132+
// Perf.
133+
{
134+
metrics_inc_copy_purge_files_counter(files.len() as u32);
135+
metrics_inc_copy_purge_files_cost_milliseconds(elapsed.as_millis() as u32);
136+
}
136137
}
137138
}

src/query/sql/src/planner/binder/binder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::HashMap;
1616
use std::str::FromStr;
1717
use std::sync::Arc;
18+
use std::time::Instant;
1819

1920
use chrono_tz::Tz;
2021
use databend_common_ast::ast::format_statement;
@@ -34,6 +35,7 @@ use databend_common_expression::Expr;
3435
use databend_common_functions::BUILTIN_FUNCTIONS;
3536
use databend_common_meta_app::principal::StageFileFormatType;
3637
use indexmap::IndexMap;
38+
use log::info;
3739
use log::warn;
3840

3941
use super::Finder;
@@ -129,10 +131,12 @@ impl<'a> Binder {
129131
#[async_backtrace::framed]
130132
#[minitrace::trace]
131133
pub async fn bind(mut self, stmt: &Statement) -> Result<Plan> {
134+
let start = Instant::now();
132135
self.ctx.set_status_info("binding");
133136
let mut init_bind_context = BindContext::new();
134137
let plan = self.bind_statement(&mut init_bind_context, stmt).await?;
135138
self.bind_query_index(&mut init_bind_context, &plan).await?;
139+
info!("bind stmt to plan, time used: {:?}", start.elapsed());
136140
Ok(plan)
137141
}
138142

src/query/sql/src/planner/planner.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16+
use std::time::Instant;
1617

1718
use databend_common_ast::ast::Expr;
1819
use databend_common_ast::ast::Literal;
@@ -68,6 +69,7 @@ impl Planner {
6869
#[async_backtrace::framed]
6970
#[minitrace::trace]
7071
pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, PlanExtras)> {
72+
let start = Instant::now();
7173
let settings = self.ctx.get_settings();
7274
let sql_dialect = settings.get_sql_dialect()?;
7375
// compile prql to sql for prql dialect
@@ -205,6 +207,7 @@ impl Planner {
205207
tokens.extend(iter);
206208
};
207209
} else {
210+
info!("logical plan built, time used: {:?}", start.elapsed());
208211
return res;
209212
}
210213
}

src/query/sql/src/planner/plans/copy_into_table.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,11 @@ impl CopyIntoTablePlan {
150150
let cost_get_all_files = end_get_all_source.duration_since(start).as_millis();
151151
metrics_inc_copy_collect_files_get_all_source_files_milliseconds(cost_get_all_files as u64);
152152

153-
ctx.set_status_info(&format!("end list files: got {} files", num_all_files));
153+
ctx.set_status_info(&format!(
154+
"end list files: got {} files, time used {:?}",
155+
num_all_files,
156+
start.elapsed()
157+
));
154158

155159
let (need_copy_file_infos, duplicated) = if self.force {
156160
if !self.stage_table_info.stage_info.copy_options.purge
@@ -167,6 +171,7 @@ impl CopyIntoTablePlan {
167171
// Status.
168172
ctx.set_status_info("begin filtering out copied files");
169173

174+
let filter_start = Instant::now();
170175
let FilteredCopyFiles {
171176
files_to_copy,
172177
duplicated_files,
@@ -180,8 +185,9 @@ impl CopyIntoTablePlan {
180185
)
181186
.await?;
182187
ctx.set_status_info(&format!(
183-
"end filtering out copied files: {}",
184-
num_all_files
188+
"end filtering out copied files: {}, time used {:?}",
189+
num_all_files,
190+
filter_start.elapsed()
185191
));
186192

187193
let end_filter_out = Instant::now();
@@ -194,11 +200,11 @@ impl CopyIntoTablePlan {
194200
};
195201

196202
info!(
197-
"copy: read files with max_files={:?} finished, all:{}, need copy:{}, elapsed:{}",
203+
"copy: read files with max_files={:?} finished, all:{}, need copy:{}, elapsed:{:?}",
198204
max_files,
199205
num_all_files,
200206
need_copy_file_infos.len(),
201-
start.elapsed().as_secs()
207+
start.elapsed()
202208
);
203209

204210
if need_copy_file_infos.is_empty() {

src/query/storages/fuse/src/io/files.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16+
use std::time::Instant;
1617

1718
use databend_common_base::runtime::execute_futures_in_parallel;
1819
use databend_common_catalog::table_context::TableContext;
@@ -39,9 +40,23 @@ impl Files {
3940
&self,
4041
file_locations: impl IntoIterator<Item = impl AsRef<str>>,
4142
) -> Result<()> {
42-
let batch_size = 1000;
4343
let locations = Vec::from_iter(file_locations.into_iter().map(|v| v.as_ref().to_string()));
4444

45+
if locations.is_empty() {
46+
return Ok(());
47+
}
48+
49+
// adjusts batch_size according to the `max_threads` settings,
50+
// limits its min/max value to 1 and 1000.
51+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
52+
let batch_size = (locations.len() / threads_nums).min(1000).max(1);
53+
54+
info!(
55+
"remove file in batch, batch_size: {}, number of chunks {}",
56+
batch_size,
57+
(locations.len() / batch_size).max(1)
58+
);
59+
4560
if locations.len() <= batch_size {
4661
Self::delete_files(self.operator.clone(), locations).await?;
4762
} else {
@@ -53,8 +68,6 @@ impl Files {
5368
.map(|location| Self::delete_files(self.operator.clone(), location.to_vec()))
5469
});
5570

56-
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
57-
5871
execute_futures_in_parallel(
5972
tasks,
6073
threads_nums,
@@ -69,14 +82,23 @@ impl Files {
6982

7083
#[async_backtrace::framed]
7184
async fn delete_files(op: Operator, locations: Vec<String>) -> Result<()> {
85+
let start = Instant::now();
7286
// temporary fix for https://github.com/datafuselabs/databend/issues/13804
7387
let locations = locations
7488
.into_iter()
7589
.map(|loc| loc.trim_start_matches('/').to_owned())
7690
.filter(|loc| !loc.is_empty())
7791
.collect::<Vec<_>>();
78-
info!("deleting files: {:?}", &locations);
92+
info!("deleting files {:?}", &locations);
93+
let num_of_files = locations.len();
94+
7995
op.remove(locations).await?;
96+
97+
info!(
98+
"deleted files, number of files {}, time used {:?}",
99+
num_of_files,
100+
start.elapsed(),
101+
);
80102
Ok(())
81103
}
82104
}

src/query/storages/fuse/src/operations/gc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,8 +588,7 @@ impl FuseTable {
588588
locations_to_be_purged: HashSet<String>,
589589
) -> Result<()> {
590590
let fuse_file = Files::create(ctx.clone(), self.operator.clone());
591-
let locations = Vec::from_iter(locations_to_be_purged);
592-
fuse_file.remove_file_in_batch(&locations).await
591+
fuse_file.remove_file_in_batch(locations_to_be_purged).await
593592
}
594593

595594
// Purge file by location chunks.

0 commit comments

Comments
 (0)