13
13
// limitations under the License.
14
14
15
15
use std:: sync:: atomic:: AtomicBool ;
16
+ use std:: sync:: atomic:: Ordering ;
16
17
use std:: sync:: mpsc:: Receiver ;
17
18
use std:: sync:: mpsc:: RecvTimeoutError ;
18
19
use std:: sync:: mpsc:: SyncSender ;
@@ -23,53 +24,93 @@ use common_base::base::Thread;
23
24
use common_datablocks:: DataBlock ;
24
25
use common_exception:: ErrorCode ;
25
26
use common_exception:: Result ;
27
+ use common_pipeline_core:: SinkPipeBuilder ;
28
+ use parking_lot:: Condvar ;
29
+ use parking_lot:: Mutex ;
26
30
27
31
use crate :: pipelines:: executor:: executor_settings:: ExecutorSettings ;
28
32
use crate :: pipelines:: executor:: PipelineExecutor ;
29
33
use crate :: pipelines:: processors:: port:: InputPort ;
30
34
use crate :: pipelines:: processors:: processor:: ProcessorPtr ;
31
35
use crate :: pipelines:: processors:: Sink ;
32
36
use crate :: pipelines:: processors:: Sinker ;
33
- use crate :: pipelines:: Pipe ;
34
37
use crate :: pipelines:: Pipeline ;
35
38
use crate :: pipelines:: PipelineBuildResult ;
36
39
37
40
struct State {
38
- sender : SyncSender < Result < Option < DataBlock > > > ,
41
+ is_catch_error : AtomicBool ,
42
+ finish_mutex : Mutex < bool > ,
43
+ finish_condvar : Condvar ,
44
+
45
+ catch_error : Mutex < Option < ErrorCode > > ,
39
46
}
40
47
41
48
impl State {
42
- pub fn create ( sender : SyncSender < Result < Option < DataBlock > > > ) -> Arc < State > {
43
- Arc :: new ( State { sender } )
49
+ pub fn create ( ) -> Arc < State > {
50
+ Arc :: new ( State {
51
+ catch_error : Mutex :: new ( None ) ,
52
+ is_catch_error : AtomicBool :: new ( false ) ,
53
+ finish_mutex : Mutex :: new ( false ) ,
54
+ finish_condvar : Condvar :: new ( ) ,
55
+ } )
56
+ }
57
+
58
+ pub fn finished ( & self , message : Result < ( ) > ) {
59
+ if let Err ( error) = message {
60
+ self . is_catch_error . store ( true , Ordering :: Release ) ;
61
+ * self . catch_error . lock ( ) = Some ( error) ;
62
+ }
63
+
64
+ let mut mutex = self . finish_mutex . lock ( ) ;
65
+ * mutex = true ;
66
+ self . finish_condvar . notify_one ( ) ;
67
+ }
68
+
69
+ pub fn wait_finish ( & self ) {
70
+ let mut mutex = self . finish_mutex . lock ( ) ;
71
+
72
+ while !* mutex {
73
+ self . finish_condvar . wait ( & mut mutex) ;
74
+ }
75
+ }
76
+
77
+ pub fn is_catch_error ( & self ) -> bool {
78
+ self . is_catch_error . load ( Ordering :: Relaxed )
79
+ }
80
+
81
+ pub fn get_catch_error ( & self ) -> ErrorCode {
82
+ let catch_error = self . catch_error . lock ( ) ;
83
+
84
+ match catch_error. as_ref ( ) {
85
+ None => ErrorCode :: LogicalError ( "It's a bug." ) ,
86
+ Some ( catch_error) => catch_error. clone ( ) ,
87
+ }
44
88
}
45
89
}
46
90
47
91
// Use this executor when the pipeline is pulling pipeline (exists source but not exists sink)
48
92
pub struct PipelinePullingExecutor {
49
93
state : Arc < State > ,
50
94
executor : Arc < PipelineExecutor > ,
51
- receiver : Receiver < Result < Option < DataBlock > > > ,
95
+ receiver : Receiver < DataBlock > ,
52
96
}
53
97
54
98
impl PipelinePullingExecutor {
55
- fn wrap_pipeline (
56
- pipeline : & mut Pipeline ,
57
- tx : SyncSender < Result < Option < DataBlock > > > ,
58
- ) -> Result < ( ) > {
99
+ fn wrap_pipeline ( pipeline : & mut Pipeline , tx : SyncSender < DataBlock > ) -> Result < ( ) > {
59
100
if pipeline. is_pushing_pipeline ( ) ? || !pipeline. is_pulling_pipeline ( ) ? {
60
101
return Err ( ErrorCode :: LogicalError (
61
102
"Logical error, PipelinePullingExecutor can only work on pulling pipeline." ,
62
103
) ) ;
63
104
}
64
105
65
- pipeline. resize ( 1 ) ?;
66
- let input = InputPort :: create ( ) ;
106
+ let mut sink_pipe_builder = SinkPipeBuilder :: create ( ) ;
107
+
108
+ for _index in 0 ..pipeline. output_len ( ) {
109
+ let input = InputPort :: create ( ) ;
110
+ sink_pipe_builder. add_sink ( input. clone ( ) , PullingSink :: create ( tx. clone ( ) , input) ) ;
111
+ }
67
112
68
- pipeline. add_pipe ( Pipe :: SimplePipe {
69
- outputs_port : vec ! [ ] ,
70
- inputs_port : vec ! [ input. clone( ) ] ,
71
- processors : vec ! [ PullingSink :: create( tx, input) ] ,
72
- } ) ;
113
+ pipeline. add_pipe ( sink_pipe_builder. finalize ( ) ) ;
73
114
Ok ( ( ) )
74
115
}
75
116
@@ -79,14 +120,13 @@ impl PipelinePullingExecutor {
79
120
settings : ExecutorSettings ,
80
121
) -> Result < PipelinePullingExecutor > {
81
122
let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( pipeline. output_len ( ) ) ;
82
- let state = State :: create ( sender. clone ( ) ) ;
83
123
84
124
Self :: wrap_pipeline ( & mut pipeline, sender) ?;
85
125
let executor = PipelineExecutor :: create ( query_need_abort, pipeline, settings) ?;
86
126
Ok ( PipelinePullingExecutor {
87
127
receiver,
88
- state,
89
128
executor,
129
+ state : State :: create ( ) ,
90
130
} )
91
131
}
92
132
@@ -97,15 +137,14 @@ impl PipelinePullingExecutor {
97
137
) -> Result < PipelinePullingExecutor > {
98
138
let mut main_pipeline = build_res. main_pipeline ;
99
139
let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( main_pipeline. output_len ( ) ) ;
100
- let state = State :: create ( sender. clone ( ) ) ;
101
140
Self :: wrap_pipeline ( & mut main_pipeline, sender) ?;
102
141
103
142
let mut pipelines = build_res. sources_pipelines ;
104
143
pipelines. push ( main_pipeline) ;
105
144
106
145
Ok ( PipelinePullingExecutor {
107
146
receiver,
108
- state,
147
+ state : State :: create ( ) ,
109
148
executor : PipelineExecutor :: from_pipelines ( query_need_abort, pipelines, settings) ?,
110
149
} )
111
150
}
@@ -136,17 +175,7 @@ impl PipelinePullingExecutor {
136
175
137
176
fn thread_function ( state : Arc < State > , executor : Arc < PipelineExecutor > ) -> impl Fn ( ) {
138
177
move || {
139
- if let Err ( cause) = executor. execute ( ) {
140
- if let Err ( send_err) = state. sender . send ( Err ( cause) ) {
141
- tracing:: warn!( "Send error {:?}" , send_err) ;
142
- }
143
-
144
- return ;
145
- }
146
-
147
- if let Err ( send_err) = state. sender . send ( Ok ( None ) ) {
148
- tracing:: warn!( "Send finish event error {:?}" , send_err) ;
149
- }
178
+ state. finished ( executor. execute ( ) ) ;
150
179
}
151
180
}
152
181
@@ -155,36 +184,30 @@ impl PipelinePullingExecutor {
155
184
}
156
185
157
186
pub fn pull_data ( & mut self ) -> Result < Option < DataBlock > > {
158
- match self . receiver . recv ( ) {
159
- Ok ( data_block) => data_block,
160
- Err ( _recv_err) => Err ( ErrorCode :: LogicalError ( "Logical error, receiver error." ) ) ,
161
- }
162
- }
187
+ loop {
188
+ return match self . receiver . recv_timeout ( Duration :: from_millis ( 100 ) ) {
189
+ Ok ( data_block) => Ok ( Some ( data_block) ) ,
190
+ Err ( RecvTimeoutError :: Timeout ) => {
191
+ if self . state . is_catch_error ( ) {
192
+ return Err ( self . state . get_catch_error ( ) ) ;
193
+ }
163
194
164
- pub fn try_pull_data < F > ( & mut self , f : F ) -> Result < Option < DataBlock > >
165
- where F : Fn ( ) -> bool {
166
- if !self . executor . is_finished ( ) {
167
- while !f ( ) {
168
- return match self . receiver . recv_timeout ( Duration :: from_millis ( 100 ) ) {
169
- Ok ( data_block) => data_block,
170
- Err ( RecvTimeoutError :: Timeout ) => {
171
- continue ;
195
+ continue ;
196
+ }
197
+ Err ( _disconnected) => {
198
+ if !self . executor . is_finished ( ) {
199
+ self . executor . finish ( ) ?;
172
200
}
173
- Err ( RecvTimeoutError :: Disconnected ) => {
174
- Err ( ErrorCode :: LogicalError ( "Logical error, receiver error." ) )
201
+
202
+ self . state . wait_finish ( ) ;
203
+
204
+ if self . state . is_catch_error ( ) {
205
+ return Err ( self . state . get_catch_error ( ) ) ;
175
206
}
176
- } ;
177
- }
178
- Ok ( None )
179
- } else {
180
- match self . receiver . try_recv ( ) {
181
- Ok ( data_block) => data_block,
182
- // puller will not pull again once it received a None
183
- Err ( err) => Err ( ErrorCode :: LogicalError ( format ! (
184
- "Logical error, try receiver error. after executor finish {}" ,
185
- err
186
- ) ) ) ,
187
- }
207
+
208
+ Ok ( None )
209
+ }
210
+ } ;
188
211
}
189
212
}
190
213
}
@@ -198,31 +221,31 @@ impl Drop for PipelinePullingExecutor {
198
221
}
199
222
200
223
struct PullingSink {
201
- sender : SyncSender < Result < Option < DataBlock > > > ,
224
+ sender : Option < SyncSender < DataBlock > > ,
202
225
}
203
226
204
227
impl PullingSink {
205
- pub fn create (
206
- tx : SyncSender < Result < Option < DataBlock > > > ,
207
- input : Arc < InputPort > ,
208
- ) -> ProcessorPtr {
209
- Sinker :: create ( input, PullingSink { sender : tx } )
228
+ pub fn create ( tx : SyncSender < DataBlock > , input : Arc < InputPort > ) -> ProcessorPtr {
229
+ Sinker :: create ( input, PullingSink { sender : Some ( tx) } )
210
230
}
211
231
}
212
232
213
233
impl Sink for PullingSink {
214
234
const NAME : & ' static str = "PullingExecutorSink" ;
215
235
216
236
fn on_finish ( & mut self ) -> Result < ( ) > {
237
+ drop ( self . sender . take ( ) ) ;
217
238
Ok ( ( ) )
218
239
}
219
240
220
241
fn consume ( & mut self , data_block : DataBlock ) -> Result < ( ) > {
221
- if let Err ( cause) = self . sender . send ( Ok ( Some ( data_block) ) ) {
222
- return Err ( ErrorCode :: LogicalError ( format ! (
223
- "Logical error, cannot push data into SyncSender, cause {:?}" ,
224
- cause
225
- ) ) ) ;
242
+ if let Some ( sender) = & self . sender {
243
+ if let Err ( cause) = sender. send ( data_block) {
244
+ return Err ( ErrorCode :: LogicalError ( format ! (
245
+ "Logical error, cannot push data into SyncSender, cause {:?}" ,
246
+ cause
247
+ ) ) ) ;
248
+ }
226
249
}
227
250
228
251
Ok ( ( ) )
0 commit comments