Skip to content

Commit 729a243

Browse files
committed
fix
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 6adc65a commit 729a243

File tree

2 files changed

+1
-10
lines changed

2 files changed

+1
-10
lines changed

src/query/pipeline/core/src/processors/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,5 @@ pub use resize_processor::create_resize_item;
3939
pub use resize_processor::ResizeProcessor;
4040
pub use shuffle_processor::Exchange;
4141
pub use shuffle_processor::MergePartitionProcessor;
42-
pub use shuffle_processor::MultiwayStrategy;
4342
pub use shuffle_processor::PartitionProcessor;
4443
pub use shuffle_processor::ShuffleProcessor;

src/query/pipeline/core/src/processors/shuffle_processor.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -345,10 +345,7 @@ impl<T: Exchange> Processor for MergePartitionProcessor<T> {
345345
input.set_need_data();
346346
}
347347

348-
if all_inputs_finished
349-
&& (!matches!(T::STRATEGY, MultiwayStrategy::Custom)
350-
|| self.inputs_data.iter().all(Option::is_none))
351-
{
348+
if all_inputs_finished {
352349
self.output.finish();
353350
return Ok(Event::Finished);
354351
}
@@ -360,11 +357,6 @@ impl<T: Exchange> Processor for MergePartitionProcessor<T> {
360357
self.output.push_data(Ok(block));
361358
return Ok(Event::NeedConsume);
362359
}
363-
364-
if all_inputs_finished && self.inputs_data.iter().all(Option::is_none) {
365-
self.output.finish();
366-
return Ok(Event::Finished);
367-
}
368360
}
369361

370362
Ok(Event::NeedData)

0 commit comments

Comments
 (0)