@@ -19,7 +19,11 @@ use databend_common_exception::Result;
19
19
use databend_common_expression:: DataSchemaRef ;
20
20
use databend_common_expression:: LimitType ;
21
21
use databend_common_expression:: SortColumnDescription ;
22
+ use databend_common_pipeline_core:: processors:: InputPort ;
23
+ use databend_common_pipeline_core:: processors:: OutputPort ;
22
24
use databend_common_pipeline_core:: processors:: ProcessorPtr ;
25
+ use databend_common_pipeline_core:: Pipe ;
26
+ use databend_common_pipeline_core:: PipeItem ;
23
27
use databend_common_pipeline_core:: Pipeline ;
24
28
use databend_common_pipeline_transforms:: processors:: add_k_way_merge_sort;
25
29
use databend_common_pipeline_transforms:: processors:: sort:: utils:: add_order_field;
@@ -38,6 +42,7 @@ use databend_storages_common_cache::TempDirManager;
38
42
39
43
use crate :: pipelines:: memory_settings:: MemorySettingsExt ;
40
44
use crate :: pipelines:: processors:: transforms:: add_range_shuffle_route;
45
+ use crate :: pipelines:: processors:: transforms:: BoundedMergeSortBuilder ;
41
46
use crate :: pipelines:: processors:: transforms:: SortInjector ;
42
47
use crate :: pipelines:: processors:: transforms:: SortRangeExchange ;
43
48
use crate :: pipelines:: processors:: transforms:: TransformLimit ;
@@ -156,7 +161,7 @@ impl PipelineBuilder {
156
161
self . build_pipeline ( & sort. input ) ?;
157
162
}
158
163
159
- self . main_pipeline . resize ( 1 , false )
164
+ builder . build_bounded_merge_sort ( & mut self . main_pipeline )
160
165
}
161
166
SortStep :: Route => self . main_pipeline . resize ( 1 , false ) ,
162
167
}
@@ -253,7 +258,7 @@ impl SortPipelineBuilder {
253
258
let memory_settings = MemorySettings :: from_sort_settings ( & self . ctx ) ?;
254
259
let enable_loser_tree = settings. get_enable_loser_tree_merge_sort ( ) ?;
255
260
256
- let builder = TransformSortBuilder :: create (
261
+ let builder = TransformSortBuilder :: new (
257
262
self . schema . clone ( ) ,
258
263
self . sort_desc . clone ( ) ,
259
264
max_block_size,
@@ -325,7 +330,7 @@ impl SortPipelineBuilder {
325
330
let memory_settings = MemorySettings :: from_sort_settings ( & self . ctx ) ?;
326
331
let enable_loser_tree = settings. get_enable_loser_tree_merge_sort ( ) ?;
327
332
328
- let builder = TransformSortBuilder :: create (
333
+ let builder = TransformSortBuilder :: new (
329
334
self . schema . clone ( ) ,
330
335
self . sort_desc . clone ( ) ,
331
336
max_block_size,
@@ -377,12 +382,10 @@ impl SortPipelineBuilder {
377
382
// Merge sort
378
383
let need_multi_merge = pipeline. output_len ( ) > 1 ;
379
384
let output_order_col = need_multi_merge || !self . remove_order_col_at_last ;
380
- debug_assert ! ( if order_col_generated {
385
+ debug_assert ! (
381
386
// If `order_col_generated`, it means this transform is the last processor in the distributed sort pipeline.
382
- !output_order_col
383
- } else {
384
- true
385
- } ) ;
387
+ !order_col_generated || !output_order_col
388
+ ) ;
386
389
387
390
let memory_settings = MemorySettings :: from_sort_settings ( & self . ctx ) ?;
388
391
let sort_merge_output_schema = match output_order_col {
@@ -414,7 +417,7 @@ impl SortPipelineBuilder {
414
417
} ;
415
418
416
419
pipeline. add_transform ( |input, output| {
417
- let builder = TransformSortBuilder :: create (
420
+ let builder = TransformSortBuilder :: new (
418
421
sort_merge_output_schema. clone ( ) ,
419
422
self . sort_desc . clone ( ) ,
420
423
self . block_size ,
@@ -476,4 +479,32 @@ impl SortPipelineBuilder {
476
479
pub fn exchange_injector ( & self ) -> Arc < dyn ExchangeInjector > {
477
480
Arc :: new ( SortInjector { } )
478
481
}
482
+
483
+ pub fn build_bounded_merge_sort ( self , pipeline : & mut Pipeline ) -> Result < ( ) > {
484
+ let inputs_port: Vec < _ > = ( 0 ..pipeline. output_len ( ) )
485
+ . map ( |_| InputPort :: create ( ) )
486
+ . collect ( ) ;
487
+ let output_port = OutputPort :: create ( ) ;
488
+
489
+ let processor = ProcessorPtr :: create (
490
+ BoundedMergeSortBuilder :: new (
491
+ inputs_port. clone ( ) ,
492
+ output_port. clone ( ) ,
493
+ self . schema . clone ( ) ,
494
+ self . sort_desc . clone ( ) ,
495
+ self . block_size ,
496
+ self . limit ,
497
+ self . remove_order_col_at_last ,
498
+ self . enable_loser_tree ,
499
+ )
500
+ . build ( ) ?,
501
+ ) ;
502
+
503
+ pipeline. add_pipe ( Pipe :: create ( inputs_port. len ( ) , 1 , vec ! [ PipeItem :: create(
504
+ processor,
505
+ inputs_port,
506
+ vec![ output_port] ,
507
+ ) ] ) ) ;
508
+ Ok ( ( ) )
509
+ }
479
510
}
0 commit comments