File tree Expand file tree Collapse file tree 2 files changed +1
-10
lines changed
src/query/pipeline/core/src/processors Expand file tree Collapse file tree 2 files changed +1
-10
lines changed Original file line number Diff line number Diff line change @@ -39,6 +39,5 @@ pub use resize_processor::create_resize_item;
39
39
pub use resize_processor:: ResizeProcessor ;
40
40
pub use shuffle_processor:: Exchange ;
41
41
pub use shuffle_processor:: MergePartitionProcessor ;
42
- pub use shuffle_processor:: MultiwayStrategy ;
43
42
pub use shuffle_processor:: PartitionProcessor ;
44
43
pub use shuffle_processor:: ShuffleProcessor ;
Original file line number Diff line number Diff line change @@ -345,10 +345,7 @@ impl<T: Exchange> Processor for MergePartitionProcessor<T> {
345
345
input. set_need_data ( ) ;
346
346
}
347
347
348
- if all_inputs_finished
349
- && ( !matches ! ( T :: STRATEGY , MultiwayStrategy :: Custom )
350
- || self . inputs_data . iter ( ) . all ( Option :: is_none) )
351
- {
348
+ if all_inputs_finished {
352
349
self . output . finish ( ) ;
353
350
return Ok ( Event :: Finished ) ;
354
351
}
@@ -360,11 +357,6 @@ impl<T: Exchange> Processor for MergePartitionProcessor<T> {
360
357
self . output . push_data ( Ok ( block) ) ;
361
358
return Ok ( Event :: NeedConsume ) ;
362
359
}
363
-
364
- if all_inputs_finished && self . inputs_data . iter ( ) . all ( Option :: is_none) {
365
- self . output . finish ( ) ;
366
- return Ok ( Event :: Finished ) ;
367
- }
368
360
}
369
361
370
362
Ok ( Event :: NeedData )
You can’t perform that action at this time.
0 commit comments