diff --git a/src/query/pipeline/core/src/processors/shuffle_processor.rs b/src/query/pipeline/core/src/processors/shuffle_processor.rs index 724e76bc3839b..5b1308113f7db 100644 --- a/src/query/pipeline/core/src/processors/shuffle_processor.rs +++ b/src/query/pipeline/core/src/processors/shuffle_processor.rs @@ -340,7 +340,10 @@ impl Processor for MergePartitionProcessor { input.set_need_data(); } - if all_inputs_finished { + if all_inputs_finished + && (!matches!(T::STRATEGY, MultiwayStrategy::Custom) + || self.inputs_data.iter().all(Option::is_none)) + { self.output.finish(); return Ok(Event::Finished); } @@ -352,6 +355,11 @@ impl Processor for MergePartitionProcessor { self.output.push_data(Ok(block)); return Ok(Event::NeedConsume); } + + if all_inputs_finished && self.inputs_data.iter().all(Option::is_none) { + self.output.finish(); + return Ok(Event::Finished); + } } Ok(Event::NeedData)