Skip to content

Commit 3ba7d93

Browse files
authored
Merge branch 'main' into num_func_v2
2 parents 38e3d26 + 165458d commit 3ba7d93

File tree

2 files changed

+97
-74
lines changed

2 files changed

+97
-74
lines changed

src/query/service/src/pipelines/executor/pipeline_executor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,11 @@ impl PipelineExecutor {
178178
}
179179
}
180180

181-
if let Err(error_code) = self.graph.check_finished() {
182-
let may_error = Some(error_code);
183-
(self.on_finished_callback)(&may_error)?;
184-
return Err(may_error.unwrap());
185-
}
181+
// if let Err(error_code) = self.graph.check_finished() {
182+
// let may_error = Some(error_code);
183+
// (self.on_finished_callback)(&may_error)?;
184+
// return Err(may_error.unwrap());
185+
// }
186186

187187
(self.on_finished_callback)(&None)?;
188188
Ok(())

src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs

Lines changed: 92 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::sync::atomic::AtomicBool;
16+
use std::sync::atomic::Ordering;
1617
use std::sync::mpsc::Receiver;
1718
use std::sync::mpsc::RecvTimeoutError;
1819
use std::sync::mpsc::SyncSender;
@@ -23,53 +24,93 @@ use common_base::base::Thread;
2324
use common_datablocks::DataBlock;
2425
use common_exception::ErrorCode;
2526
use common_exception::Result;
27+
use common_pipeline_core::SinkPipeBuilder;
28+
use parking_lot::Condvar;
29+
use parking_lot::Mutex;
2630

2731
use crate::pipelines::executor::executor_settings::ExecutorSettings;
2832
use crate::pipelines::executor::PipelineExecutor;
2933
use crate::pipelines::processors::port::InputPort;
3034
use crate::pipelines::processors::processor::ProcessorPtr;
3135
use crate::pipelines::processors::Sink;
3236
use crate::pipelines::processors::Sinker;
33-
use crate::pipelines::Pipe;
3437
use crate::pipelines::Pipeline;
3538
use crate::pipelines::PipelineBuildResult;
3639

3740
struct State {
38-
sender: SyncSender<Result<Option<DataBlock>>>,
41+
is_catch_error: AtomicBool,
42+
finish_mutex: Mutex<bool>,
43+
finish_condvar: Condvar,
44+
45+
catch_error: Mutex<Option<ErrorCode>>,
3946
}
4047

4148
impl State {
42-
pub fn create(sender: SyncSender<Result<Option<DataBlock>>>) -> Arc<State> {
43-
Arc::new(State { sender })
49+
pub fn create() -> Arc<State> {
50+
Arc::new(State {
51+
catch_error: Mutex::new(None),
52+
is_catch_error: AtomicBool::new(false),
53+
finish_mutex: Mutex::new(false),
54+
finish_condvar: Condvar::new(),
55+
})
56+
}
57+
58+
pub fn finished(&self, message: Result<()>) {
59+
if let Err(error) = message {
60+
self.is_catch_error.store(true, Ordering::Release);
61+
*self.catch_error.lock() = Some(error);
62+
}
63+
64+
let mut mutex = self.finish_mutex.lock();
65+
*mutex = true;
66+
self.finish_condvar.notify_one();
67+
}
68+
69+
pub fn wait_finish(&self) {
70+
let mut mutex = self.finish_mutex.lock();
71+
72+
while !*mutex {
73+
self.finish_condvar.wait(&mut mutex);
74+
}
75+
}
76+
77+
pub fn is_catch_error(&self) -> bool {
78+
self.is_catch_error.load(Ordering::Relaxed)
79+
}
80+
81+
pub fn get_catch_error(&self) -> ErrorCode {
82+
let catch_error = self.catch_error.lock();
83+
84+
match catch_error.as_ref() {
85+
None => ErrorCode::LogicalError("It's a bug."),
86+
Some(catch_error) => catch_error.clone(),
87+
}
4488
}
4589
}
4690

4791
// Use this executor when the pipeline is pulling pipeline (exists source but not exists sink)
4892
pub struct PipelinePullingExecutor {
4993
state: Arc<State>,
5094
executor: Arc<PipelineExecutor>,
51-
receiver: Receiver<Result<Option<DataBlock>>>,
95+
receiver: Receiver<DataBlock>,
5296
}
5397

5498
impl PipelinePullingExecutor {
55-
fn wrap_pipeline(
56-
pipeline: &mut Pipeline,
57-
tx: SyncSender<Result<Option<DataBlock>>>,
58-
) -> Result<()> {
99+
fn wrap_pipeline(pipeline: &mut Pipeline, tx: SyncSender<DataBlock>) -> Result<()> {
59100
if pipeline.is_pushing_pipeline()? || !pipeline.is_pulling_pipeline()? {
60101
return Err(ErrorCode::LogicalError(
61102
"Logical error, PipelinePullingExecutor can only work on pulling pipeline.",
62103
));
63104
}
64105

65-
pipeline.resize(1)?;
66-
let input = InputPort::create();
106+
let mut sink_pipe_builder = SinkPipeBuilder::create();
107+
108+
for _index in 0..pipeline.output_len() {
109+
let input = InputPort::create();
110+
sink_pipe_builder.add_sink(input.clone(), PullingSink::create(tx.clone(), input));
111+
}
67112

68-
pipeline.add_pipe(Pipe::SimplePipe {
69-
outputs_port: vec![],
70-
inputs_port: vec![input.clone()],
71-
processors: vec![PullingSink::create(tx, input)],
72-
});
113+
pipeline.add_pipe(sink_pipe_builder.finalize());
73114
Ok(())
74115
}
75116

@@ -79,14 +120,13 @@ impl PipelinePullingExecutor {
79120
settings: ExecutorSettings,
80121
) -> Result<PipelinePullingExecutor> {
81122
let (sender, receiver) = std::sync::mpsc::sync_channel(pipeline.output_len());
82-
let state = State::create(sender.clone());
83123

84124
Self::wrap_pipeline(&mut pipeline, sender)?;
85125
let executor = PipelineExecutor::create(query_need_abort, pipeline, settings)?;
86126
Ok(PipelinePullingExecutor {
87127
receiver,
88-
state,
89128
executor,
129+
state: State::create(),
90130
})
91131
}
92132

@@ -97,15 +137,14 @@ impl PipelinePullingExecutor {
97137
) -> Result<PipelinePullingExecutor> {
98138
let mut main_pipeline = build_res.main_pipeline;
99139
let (sender, receiver) = std::sync::mpsc::sync_channel(main_pipeline.output_len());
100-
let state = State::create(sender.clone());
101140
Self::wrap_pipeline(&mut main_pipeline, sender)?;
102141

103142
let mut pipelines = build_res.sources_pipelines;
104143
pipelines.push(main_pipeline);
105144

106145
Ok(PipelinePullingExecutor {
107146
receiver,
108-
state,
147+
state: State::create(),
109148
executor: PipelineExecutor::from_pipelines(query_need_abort, pipelines, settings)?,
110149
})
111150
}
@@ -136,17 +175,7 @@ impl PipelinePullingExecutor {
136175

137176
fn thread_function(state: Arc<State>, executor: Arc<PipelineExecutor>) -> impl Fn() {
138177
move || {
139-
if let Err(cause) = executor.execute() {
140-
if let Err(send_err) = state.sender.send(Err(cause)) {
141-
tracing::warn!("Send error {:?}", send_err);
142-
}
143-
144-
return;
145-
}
146-
147-
if let Err(send_err) = state.sender.send(Ok(None)) {
148-
tracing::warn!("Send finish event error {:?}", send_err);
149-
}
178+
state.finished(executor.execute());
150179
}
151180
}
152181

@@ -155,36 +184,30 @@ impl PipelinePullingExecutor {
155184
}
156185

157186
pub fn pull_data(&mut self) -> Result<Option<DataBlock>> {
158-
match self.receiver.recv() {
159-
Ok(data_block) => data_block,
160-
Err(_recv_err) => Err(ErrorCode::LogicalError("Logical error, receiver error.")),
161-
}
162-
}
187+
loop {
188+
return match self.receiver.recv_timeout(Duration::from_millis(100)) {
189+
Ok(data_block) => Ok(Some(data_block)),
190+
Err(RecvTimeoutError::Timeout) => {
191+
if self.state.is_catch_error() {
192+
return Err(self.state.get_catch_error());
193+
}
163194

164-
pub fn try_pull_data<F>(&mut self, f: F) -> Result<Option<DataBlock>>
165-
where F: Fn() -> bool {
166-
if !self.executor.is_finished() {
167-
while !f() {
168-
return match self.receiver.recv_timeout(Duration::from_millis(100)) {
169-
Ok(data_block) => data_block,
170-
Err(RecvTimeoutError::Timeout) => {
171-
continue;
195+
continue;
196+
}
197+
Err(_disconnected) => {
198+
if !self.executor.is_finished() {
199+
self.executor.finish()?;
172200
}
173-
Err(RecvTimeoutError::Disconnected) => {
174-
Err(ErrorCode::LogicalError("Logical error, receiver error."))
201+
202+
self.state.wait_finish();
203+
204+
if self.state.is_catch_error() {
205+
return Err(self.state.get_catch_error());
175206
}
176-
};
177-
}
178-
Ok(None)
179-
} else {
180-
match self.receiver.try_recv() {
181-
Ok(data_block) => data_block,
182-
// puller will not pull again once it received a None
183-
Err(err) => Err(ErrorCode::LogicalError(format!(
184-
"Logical error, try receiver error. after executor finish {}",
185-
err
186-
))),
187-
}
207+
208+
Ok(None)
209+
}
210+
};
188211
}
189212
}
190213
}
@@ -198,31 +221,31 @@ impl Drop for PipelinePullingExecutor {
198221
}
199222

200223
struct PullingSink {
201-
sender: SyncSender<Result<Option<DataBlock>>>,
224+
sender: Option<SyncSender<DataBlock>>,
202225
}
203226

204227
impl PullingSink {
205-
pub fn create(
206-
tx: SyncSender<Result<Option<DataBlock>>>,
207-
input: Arc<InputPort>,
208-
) -> ProcessorPtr {
209-
Sinker::create(input, PullingSink { sender: tx })
228+
pub fn create(tx: SyncSender<DataBlock>, input: Arc<InputPort>) -> ProcessorPtr {
229+
Sinker::create(input, PullingSink { sender: Some(tx) })
210230
}
211231
}
212232

213233
impl Sink for PullingSink {
214234
const NAME: &'static str = "PullingExecutorSink";
215235

216236
fn on_finish(&mut self) -> Result<()> {
237+
drop(self.sender.take());
217238
Ok(())
218239
}
219240

220241
fn consume(&mut self, data_block: DataBlock) -> Result<()> {
221-
if let Err(cause) = self.sender.send(Ok(Some(data_block))) {
222-
return Err(ErrorCode::LogicalError(format!(
223-
"Logical error, cannot push data into SyncSender, cause {:?}",
224-
cause
225-
)));
242+
if let Some(sender) = &self.sender {
243+
if let Err(cause) = sender.send(data_block) {
244+
return Err(ErrorCode::LogicalError(format!(
245+
"Logical error, cannot push data into SyncSender, cause {:?}",
246+
cause
247+
)));
248+
}
226249
}
227250

228251
Ok(())

0 commit comments

Comments
 (0)