Skip to content

Commit 81f45ee

Browse files
committed
refactor(processor): save pipeline executor into query context
1 parent f2265e1 commit 81f45ee

18 files changed

+170
-187
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::ErrorCode;
16+
use common_exception::Result;
17+
18+
pub fn catch_unwind<F: FnOnce() -> R, R>(f: F) -> Result<R> {
19+
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
20+
Ok(res) => Ok(res),
21+
Err(cause) => match cause.downcast_ref::<&'static str>() {
22+
None => match cause.downcast_ref::<String>() {
23+
None => Err(ErrorCode::PanicError("Sorry, unknown panic message")),
24+
Some(message) => Err(ErrorCode::PanicError(message.to_string())),
25+
},
26+
Some(message) => Err(ErrorCode::PanicError(message.to_string())),
27+
},
28+
}
29+
}

src/common/base/src/base/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod catch_unwind;
1516
mod global_runtime;
1617
mod net;
1718
mod profiling;
@@ -27,6 +28,7 @@ mod string_func;
2728
mod thread;
2829
mod uniq_id;
2930

31+
pub use catch_unwind::catch_unwind;
3032
pub use global_runtime::GlobalIORuntime;
3133
pub use net::get_free_tcp_port;
3234
pub use net::get_free_udp_port;
@@ -53,6 +55,7 @@ pub use string_func::mask_string;
5355
pub use string_func::replace_nth_char;
5456
pub use string_func::unescape_for_key;
5557
pub use thread::Thread;
58+
pub use thread::ThreadJoinHandle;
5659
pub use tokio;
5760
pub use uniq_id::GlobalSequence;
5861
pub use uniq_id::GlobalUniqName;

src/common/base/src/base/thread.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,38 @@
1515
use std::thread::Builder;
1616
use std::thread::JoinHandle;
1717

18+
use common_exception::ErrorCode;
19+
use common_exception::Result;
20+
1821
use super::runtime_tracker::ThreadTracker;
1922

2023
pub struct Thread;
2124

25+
pub struct ThreadJoinHandle<T> {
26+
inner: JoinHandle<T>,
27+
}
28+
29+
impl<T> ThreadJoinHandle<T> {
30+
pub fn create(inner: JoinHandle<T>) -> ThreadJoinHandle<T> {
31+
ThreadJoinHandle { inner }
32+
}
33+
34+
pub fn join(self) -> Result<T> {
35+
match self.inner.join() {
36+
Ok(res) => Ok(res),
37+
Err(cause) => match cause.downcast_ref::<&'static str>() {
38+
None => match cause.downcast_ref::<String>() {
39+
None => Err(ErrorCode::PanicError("Sorry, unknown panic message")),
40+
Some(message) => Err(ErrorCode::PanicError(message.to_string())),
41+
},
42+
Some(message) => Err(ErrorCode::PanicError(message.to_string())),
43+
},
44+
}
45+
}
46+
}
47+
2248
impl Thread {
23-
pub fn named_spawn<F, T>(mut name: Option<String>, f: F) -> JoinHandle<T>
49+
pub fn named_spawn<F, T>(mut name: Option<String>, f: F) -> ThreadJoinHandle<T>
2450
where
2551
F: FnOnce() -> T,
2652
F: Send + 'static,
@@ -42,18 +68,18 @@ impl Thread {
4268
thread_builder = thread_builder.name(named);
4369
}
4470

45-
match ThreadTracker::current_runtime_tracker() {
71+
ThreadJoinHandle::create(match ThreadTracker::current_runtime_tracker() {
4672
None => thread_builder.spawn(f).unwrap(),
4773
Some(runtime_tracker) => thread_builder
4874
.spawn(move || {
4975
ThreadTracker::create(runtime_tracker);
5076
f()
5177
})
5278
.unwrap(),
53-
}
79+
})
5480
}
5581

56-
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
82+
pub fn spawn<F, T>(f: F) -> ThreadJoinHandle<T>
5783
where
5884
F: FnOnce() -> T,
5985
F: Send + 'static,

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -506,9 +506,7 @@ impl QueryCoordinator {
506506
pub fn shutdown_query(&mut self) {
507507
if let Some(query_info) = &self.info {
508508
if let Some(query_executor) = &query_info.query_executor {
509-
if let Err(cause) = query_executor.finish() {
510-
tracing::error!("Cannot shutdown query, because {:?}", cause);
511-
}
509+
query_executor.finish(None);
512510
}
513511
}
514512
}
@@ -553,14 +551,9 @@ impl QueryCoordinator {
553551
}
554552
}
555553

556-
let query_need_abort = info.query_ctx.query_need_abort();
557554
let executor_settings = ExecutorSettings::try_create(&info.query_ctx.get_settings())?;
558555

559-
let executor = PipelineCompleteExecutor::from_pipelines(
560-
query_need_abort,
561-
pipelines,
562-
executor_settings,
563-
)?;
556+
let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
564557

565558
self.fragment_exchanges.clear();
566559
let info_mut = self.info.as_mut().expect("Query info is None");

src/query/service/src/interpreters/async_insert_queue.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,10 @@ impl AsyncInsertManager {
409409
let mut pipelines = build_res.sources_pipelines;
410410
pipelines.push(build_res.main_pipeline);
411411
let executor_settings = ExecutorSettings::try_create(settings)?;
412-
let executor = PipelineCompleteExecutor::from_pipelines(
413-
ctx.query_need_abort(),
414-
pipelines,
415-
executor_settings,
416-
)?;
412+
let executor =
413+
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
417414

415+
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
418416
executor.execute()?;
419417
drop(executor);
420418
let blocks = ctx.consume_precommit_blocks();

src/query/service/src/interpreters/interpreter.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,17 @@ pub trait Interpreter: Sync + Send {
5656
}
5757

5858
let settings = ctx.get_settings();
59-
let query_need_abort = ctx.query_need_abort();
6059
let executor_settings = ExecutorSettings::try_create(&settings)?;
6160
build_res.set_max_threads(settings.get_max_threads()? as usize);
6261

6362
if build_res.main_pipeline.is_complete_pipeline()? {
6463
let mut pipelines = build_res.sources_pipelines;
6564
pipelines.push(build_res.main_pipeline);
6665

67-
let complete_executor = PipelineCompleteExecutor::from_pipelines(
68-
query_need_abort,
69-
pipelines,
70-
executor_settings,
71-
)?;
66+
let complete_executor =
67+
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
7268

69+
ctx.set_executor(Arc::downgrade(&complete_executor.get_inner()));
7370
complete_executor.execute()?;
7471
return Ok(Box::pin(DataBlockStream::create(
7572
Arc::new(DataSchema::new(vec![])),
@@ -83,12 +80,12 @@ pub trait Interpreter: Sync + Send {
8380
return handle.execute(ctx.clone(), build_res, self.schema()).await;
8481
}
8582

83+
let pulling_executor =
84+
PipelinePullingExecutor::from_pipelines(build_res, executor_settings)?;
85+
86+
ctx.set_executor(Arc::downgrade(&pulling_executor.get_inner()));
8687
Ok(Box::pin(Box::pin(ProcessorExecutorStream::create(
87-
PipelinePullingExecutor::from_pipelines(
88-
ctx.query_need_abort(),
89-
build_res,
90-
executor_settings,
91-
)?,
88+
pulling_executor,
9289
)?)))
9390
}
9491

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,12 @@ pub fn append2table(
113113
}
114114

115115
pub fn execute_pipeline(ctx: Arc<QueryContext>, mut res: PipelineBuildResult) -> Result<()> {
116-
let query_need_abort = ctx.query_need_abort();
117116
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?;
118117
res.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
119118
let mut pipelines = res.sources_pipelines;
120119
pipelines.push(res.main_pipeline);
121-
let executor =
122-
PipelineCompleteExecutor::from_pipelines(query_need_abort, pipelines, executor_settings)?;
120+
let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
121+
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
123122
executor.execute()
124123
}
125124

src/query/service/src/interpreters/interpreter_table_optimize.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,10 @@ impl Interpreter for OptimizeTableInterpreter {
7070
if let Some(mutator) = mutator {
7171
let settings = ctx.get_settings();
7272
pipeline.set_max_threads(settings.get_max_threads()? as usize);
73-
let query_need_abort = ctx.query_need_abort();
7473
let executor_settings = ExecutorSettings::try_create(&settings)?;
75-
let executor = PipelineCompleteExecutor::try_create(
76-
query_need_abort,
77-
pipeline,
78-
executor_settings,
79-
)?;
74+
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
75+
76+
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
8077
executor.execute()?;
8178
drop(executor);
8279

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,10 @@ impl Interpreter for ReclusterTableInterpreter {
7474

7575
pipeline.set_max_threads(settings.get_max_threads()? as usize);
7676

77-
let query_need_abort = ctx.query_need_abort();
7877
let executor_settings = ExecutorSettings::try_create(&settings)?;
79-
let executor = PipelineCompleteExecutor::try_create(
80-
query_need_abort,
81-
pipeline,
82-
executor_settings,
83-
)?;
78+
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
79+
80+
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
8481
executor.execute()?;
8582
drop(executor);
8683

src/query/service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#![feature(option_get_or_insert_default)]
2525
#![feature(result_option_inspect)]
2626
#![feature(is_some_with)]
27+
#![feature(result_flattening)]
2728

2829
extern crate core;
2930

0 commit comments

Comments
 (0)