Skip to content

Commit 98873c5

Browse files
authored
feat(executor): record query profiling for standalone error query (#15425)
1 parent 3eb3798 commit 98873c5

File tree

11 files changed

+87
-80
lines changed

11 files changed

+87
-80
lines changed

โ€Žsrc/query/pipeline/core/src/pipeline.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl Debug for Pipeline {
7777
pub type InitCallback = Box<dyn FnOnce() -> Result<()> + Send + Sync + 'static>;
7878

7979
pub type FinishedCallback =
80-
Box<dyn FnOnce(&Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> + Send + Sync + 'static>;
80+
Box<dyn FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static>;
8181

8282
pub type DynTransformBuilder = Box<dyn Fn(Arc<InputPort>, Arc<OutputPort>) -> Result<ProcessorPtr>>;
8383

@@ -454,15 +454,15 @@ impl Pipeline {
454454
}
455455

456456
pub fn set_on_finished<
457-
F: FnOnce(&Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> + Send + Sync + 'static,
457+
F: FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static,
458458
>(
459459
&mut self,
460460
f: F,
461461
) {
462462
if let Some(on_finished) = self.on_finished.take() {
463-
self.on_finished = Some(Box::new(move |may_error| {
464-
on_finished(may_error)?;
465-
f(may_error)
463+
self.on_finished = Some(Box::new(move |(profiles, may_error)| {
464+
on_finished((profiles, may_error))?;
465+
f((profiles, may_error))
466466
}));
467467

468468
return;
@@ -472,15 +472,15 @@ impl Pipeline {
472472
}
473473

474474
pub fn push_front_on_finished_callback<
475-
F: FnOnce(&Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> + Send + Sync + 'static,
475+
F: FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static,
476476
>(
477477
&mut self,
478478
f: F,
479479
) {
480480
if let Some(on_finished) = self.on_finished.take() {
481-
self.on_finished = Some(Box::new(move |may_error| {
482-
f(may_error)?;
483-
on_finished(may_error)
481+
self.on_finished = Some(Box::new(move |(profiles, may_error)| {
482+
f((profiles, may_error))?;
483+
on_finished((profiles, may_error))
484484
}));
485485

486486
return;
@@ -527,7 +527,7 @@ impl Drop for Pipeline {
527527
"Pipeline illegal state: not successfully shutdown.",
528528
));
529529

530-
let _ = (on_finished)(&cause);
530+
let _ = (on_finished)((&vec![], &cause));
531531
}
532532
})
533533
}

โ€Žsrc/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async fn do_hook_compact(
7272
return Ok(());
7373
}
7474

75-
pipeline.set_on_finished(move |err| {
75+
pipeline.set_on_finished(move |(_profiles, err)| {
7676
let compaction_limits = match compact_target.mutation_kind {
7777
MutationKind::Insert => {
7878
let compaction_num_block_hint = ctx.get_compaction_num_block_hint();

โ€Žsrc/query/service/src/interpreters/hook/refresh_hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub async fn hook_refresh(
6363
return;
6464
}
6565

66-
pipeline.set_on_finished(move |err| {
66+
pipeline.set_on_finished(move |(_profiles, err)| {
6767
if err.is_ok() {
6868
info!("execute pipeline finished successfully, starting run refresh job.");
6969
match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, need_lock)) {

โ€Žsrc/query/service/src/interpreters/interpreter.rs

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -101,47 +101,50 @@ pub trait Interpreter: Sync + Send {
101101
}
102102

103103
let query_ctx = ctx.clone();
104-
build_res.main_pipeline.set_on_finished(move |may_error| {
105-
let mut has_profiles = false;
106-
if let Ok(plans_profile) = may_error {
107-
query_ctx.add_query_profiles(plans_profile);
108-
109-
let query_profiles = query_ctx.get_query_profiles();
110-
111-
if !query_profiles.is_empty() {
112-
has_profiles = true;
113-
#[derive(serde::Serialize)]
114-
struct QueryProfiles {
115-
query_id: String,
116-
profiles: Vec<PlanProfile>,
117-
statistics_desc: Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>>,
104+
build_res
105+
.main_pipeline
106+
.set_on_finished(move |(plans_profile, may_error)| {
107+
let mut has_profiles = false;
108+
// Standalone mode or query executed is successfully
109+
if query_ctx.get_cluster().is_empty() || may_error.is_ok() {
110+
query_ctx.add_query_profiles(plans_profile);
111+
112+
let query_profiles = query_ctx.get_query_profiles();
113+
114+
if !query_profiles.is_empty() {
115+
has_profiles = true;
116+
#[derive(serde::Serialize)]
117+
struct QueryProfiles {
118+
query_id: String,
119+
profiles: Vec<PlanProfile>,
120+
statistics_desc: Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>>,
121+
}
122+
123+
info!(
124+
target: "databend::log::profile",
125+
"{}",
126+
serde_json::to_string(&QueryProfiles {
127+
query_id: query_ctx.get_id(),
128+
profiles: query_profiles,
129+
statistics_desc: get_statistics_desc(),
130+
})?
131+
);
118132
}
119-
120-
info!(
121-
target: "databend::log::profile",
122-
"{}",
123-
serde_json::to_string(&QueryProfiles {
124-
query_id: query_ctx.get_id(),
125-
profiles: query_profiles,
126-
statistics_desc: get_statistics_desc(),
127-
})?
128-
);
129133
}
130-
}
131134

132-
let err_opt = match may_error {
133-
Ok(_) => None,
134-
Err(e) => Some(e.clone()),
135-
};
135+
let err_opt = match may_error {
136+
Ok(_) => None,
137+
Err(e) => Some(e.clone()),
138+
};
136139

137-
InterpreterMetrics::record_query_finished(&query_ctx, err_opt.clone());
138-
log_query_finished(&query_ctx, err_opt, has_profiles);
140+
InterpreterMetrics::record_query_finished(&query_ctx, err_opt.clone());
141+
log_query_finished(&query_ctx, err_opt, has_profiles);
139142

140-
match may_error {
141-
Ok(_) => Ok(()),
142-
Err(error) => Err(error.clone()),
143-
}
144-
});
143+
match may_error {
144+
Ok(_) => Ok(()),
145+
Err(error) => Err(error.clone()),
146+
}
147+
});
145148

146149
ctx.set_status_info("executing pipeline");
147150

โ€Žsrc/query/service/src/interpreters/interpreter_index_refresh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ impl Interpreter for RefreshIndexInterpreter {
377377

378378
build_res
379379
.main_pipeline
380-
.set_on_finished(move |may_error| match may_error {
380+
.set_on_finished(move |(_profiles, may_error)| match may_error {
381381
Ok(_) => GlobalIORuntime::instance()
382382
.block_on(async move { modify_last_update(ctx, req).await }),
383383
Err(error_code) => Err(error_code.clone()),

โ€Žsrc/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ impl CreateTableInterpreter {
265265

266266
pipeline
267267
.main_pipeline
268-
.push_front_on_finished_callback(move |err| {
268+
.push_front_on_finished_callback(move |(_profiles, err)| {
269269
if err.is_ok() {
270270
let qualified_table_name = format!("{}.{}", db_name, table_name);
271271
let undrop_fut = async move {

โ€Žsrc/query/service/src/interpreters/interpreter_table_optimize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl OptimizeTableInterpreter {
258258
let start = SystemTime::now();
259259
build_res
260260
.main_pipeline
261-
.set_on_finished(move |may_error| match may_error {
261+
.set_on_finished(move |(_profiles, may_error)| match may_error {
262262
Ok(_) => InterpreterClusteringHistory::write_log(
263263
&ctx,
264264
start,
@@ -282,7 +282,7 @@ impl OptimizeTableInterpreter {
282282
} else {
283283
build_res
284284
.main_pipeline
285-
.set_on_finished(move |may_error| match may_error {
285+
.set_on_finished(move |(_profiles, may_error)| match may_error {
286286
Ok(_) => GlobalIORuntime::instance()
287287
.block_on(async move { purge(ctx, catalog, plan, None).await }),
288288
Err(error_code) => Err(error_code.clone()),

โ€Žsrc/query/service/src/pipelines/builders/builder_on_finished.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl PipelineBuilder {
4747
is_active
4848
};
4949
// set on_finished callback.
50-
main_pipeline.set_on_finished(move |may_error| {
50+
main_pipeline.set_on_finished(move |(_profiles, may_error)| {
5151
match may_error {
5252
Ok(_) => {
5353
GlobalIORuntime::instance().block_on(async move {

โ€Žsrc/query/service/src/pipelines/executor/pipeline_executor.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::pipelines::executor::RunningGraph;
4444
pub type InitCallback = Box<dyn FnOnce() -> Result<()> + Send + Sync + 'static>;
4545

4646
pub type FinishedCallback =
47-
Box<dyn FnOnce(&Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> + Send + Sync + 'static>;
47+
Box<dyn FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static>;
4848

4949
pub struct QueryWrapper {
5050
graph: Arc<RunningGraph>,
@@ -206,7 +206,7 @@ impl PipelineExecutor {
206206
let guard = query_wrapper.on_finished_callback.lock().take();
207207
if let Some(on_finished_callback) = guard {
208208
catch_unwind(move || {
209-
on_finished_callback(&Ok(self.get_plans_profile()))
209+
on_finished_callback((&self.get_plans_profile(), &Ok(())))
210210
})??;
211211
}
212212
Ok(())
@@ -215,7 +215,9 @@ impl PipelineExecutor {
215215
let guard = query_wrapper.on_finished_callback.lock().take();
216216
let cause_clone = cause.clone();
217217
if let Some(on_finished_callback) = guard {
218-
catch_unwind(move || on_finished_callback(&Err(cause_clone)))??;
218+
catch_unwind(move || {
219+
on_finished_callback((&self.get_plans_profile(), &Err(cause_clone)))
220+
})??;
219221
}
220222
Err(cause)
221223
}

โ€Žsrc/query/service/src/pipelines/executor/query_pipeline_executor.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use crate::pipelines::executor::WorkersCondvar;
5656
pub type InitCallback = Box<dyn FnOnce() -> Result<()> + Send + Sync + 'static>;
5757

5858
pub type FinishedCallback =
59-
Box<dyn FnOnce(&Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> + Send + Sync + 'static>;
59+
Box<dyn FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static>;
6060

6161
pub struct QueryPipelineExecutor {
6262
threads_num: usize,
@@ -92,7 +92,7 @@ impl QueryPipelineExecutor {
9292

9393
match RunningGraph::create(pipeline, 1, settings.query_id.clone(), None) {
9494
Err(cause) => {
95-
let _ = on_finished_callback(&Err(cause.clone()));
95+
let _ = on_finished_callback((&vec![], &Err(cause.clone())));
9696
Err(cause)
9797
}
9898
Ok(running_graph) => Self::try_create(
@@ -163,7 +163,7 @@ impl QueryPipelineExecutor {
163163
match RunningGraph::from_pipelines(pipelines, 1, settings.query_id.clone(), None) {
164164
Err(cause) => {
165165
if let Some(on_finished_callback) = on_finished_callback {
166-
let _ = on_finished_callback(&Err(cause.clone()));
166+
let _ = on_finished_callback((&vec![], &Err(cause.clone())));
167167
}
168168

169169
Err(cause)
@@ -205,7 +205,7 @@ impl QueryPipelineExecutor {
205205
}))
206206
}
207207

208-
fn on_finished(&self, error: &Result<Vec<PlanProfile>, ErrorCode>) -> Result<()> {
208+
fn on_finished(&self, error: (&Vec<PlanProfile>, &Result<()>)) -> Result<()> {
209209
let mut guard = self.on_finished_callback.lock();
210210
if let Some(on_finished_callback) = guard.take() {
211211
drop(guard);
@@ -263,26 +263,26 @@ impl QueryPipelineExecutor {
263263
let may_error = error.clone();
264264
drop(finished_error_guard);
265265

266-
self.on_finished(&Err(may_error.clone()))?;
266+
self.on_finished((&self.get_plans_profile(), &Err(may_error.clone())))?;
267267
return Err(may_error);
268268
}
269269
}
270270

271271
// We will ignore the abort query error, because returned by finished_error if abort query.
272272
if matches!(&thread_res, Err(error) if error.code() != ErrorCode::ABORTED_QUERY) {
273273
let may_error = thread_res.unwrap_err();
274-
self.on_finished(&Err(may_error.clone()))?;
274+
self.on_finished((&self.get_plans_profile(), &Err(may_error.clone())))?;
275275
return Err(may_error);
276276
}
277277
}
278278

279279
if let Err(error) = self.graph.assert_finished_graph() {
280-
self.on_finished(&Err(error.clone()))?;
280+
self.on_finished((&self.get_plans_profile(), &Err(error.clone())))?;
281281
return Err(error);
282282
}
283283

284284
// self.settings.query_id
285-
self.on_finished(&Ok(self.get_plans_profile()))?;
285+
self.on_finished((&self.get_plans_profile(), &Ok(())))?;
286286
Ok(())
287287
}
288288

@@ -544,7 +544,7 @@ impl Drop for QueryPipelineExecutor {
544544

545545
if let Err(cause) = catch_unwind(move || {
546546
let _guard = ThreadTracker::tracking(tracking_payload);
547-
on_finished_callback(&Err(cause))
547+
on_finished_callback((&self.get_plans_profile(), &Err(cause)))
548548
})
549549
.flatten()
550550
{

0 commit comments

Comments
ย (0)