@@ -41,14 +41,8 @@ use databend_common_storages_fuse::TableContext;
41
41
use databend_storages_common_cache:: TempDirManager ;
42
42
43
43
use crate :: pipelines:: memory_settings:: MemorySettingsExt ;
44
- use crate :: pipelines:: processors:: transforms:: add_range_shuffle_route;
45
- use crate :: pipelines:: processors:: transforms:: BoundedMergeSortBuilder ;
46
- use crate :: pipelines:: processors:: transforms:: SortInjector ;
47
- use crate :: pipelines:: processors:: transforms:: SortRangeExchange ;
48
- use crate :: pipelines:: processors:: transforms:: TransformLimit ;
49
44
use crate :: pipelines:: processors:: transforms:: TransformSortBuilder ;
50
45
use crate :: pipelines:: PipelineBuilder ;
51
- use crate :: servers:: flight:: v1:: exchange:: ExchangeInjector ;
52
46
use crate :: sessions:: QueryContext ;
53
47
use crate :: spillers:: Spiller ;
54
48
use crate :: spillers:: SpillerConfig ;
@@ -148,12 +142,12 @@ impl PipelineBuilder {
148
142
149
143
SortStep :: Sample => {
150
144
builder. build_sample ( & mut self . main_pipeline ) ?;
151
- self . exchange_injector = Arc :: new ( SortInjector { } ) ;
145
+ self . exchange_injector = TransformSortBuilder :: exchange_injector ( ) ;
152
146
Ok ( ( ) )
153
147
}
154
148
SortStep :: SortShuffled => {
155
149
if matches ! ( * sort. input, PhysicalPlan :: ExchangeSource ( _) ) {
156
- let exchange = Arc :: new ( SortInjector { } ) ;
150
+ let exchange = TransformSortBuilder :: exchange_injector ( ) ;
157
151
let old_inject = std:: mem:: replace ( & mut self . exchange_injector , exchange) ;
158
152
self . build_pipeline ( & sort. input ) ?;
159
153
self . exchange_injector = old_inject;
@@ -163,7 +157,7 @@ impl PipelineBuilder {
163
157
164
158
builder. build_bounded_merge_sort ( & mut self . main_pipeline )
165
159
}
166
- SortStep :: Route => self . main_pipeline . resize ( 1 , false ) ,
160
+ SortStep :: Route => TransformSortBuilder :: add_route ( & mut self . main_pipeline ) ,
167
161
}
168
162
}
169
163
}
@@ -229,10 +223,8 @@ impl SortPipelineBuilder {
229
223
self . build_merge_sort_pipeline ( pipeline, false )
230
224
}
231
225
232
- pub fn build_range_shuffle_sort_pipeline ( self , pipeline : & mut Pipeline ) -> Result < ( ) > {
233
- let inputs = pipeline. output_len ( ) ;
226
+ fn build_sample ( self , pipeline : & mut Pipeline ) -> Result < ( ) > {
234
227
let settings = self . ctx . get_settings ( ) ;
235
- let num_exec = inputs;
236
228
let max_block_size = settings. get_max_block_size ( ) ? as usize ;
237
229
238
230
// Partial sort
@@ -258,123 +250,35 @@ impl SortPipelineBuilder {
258
250
let memory_settings = MemorySettings :: from_sort_settings ( & self . ctx ) ?;
259
251
let enable_loser_tree = settings. get_enable_loser_tree_merge_sort ( ) ?;
260
252
261
- let builder = TransformSortBuilder :: new (
262
- self . schema . clone ( ) ,
263
- self . sort_desc . clone ( ) ,
264
- max_block_size,
265
- spiller,
266
- )
267
- . with_limit ( self . limit )
268
- . with_order_col_generated ( false )
269
- . with_output_order_col ( false )
270
- . with_memory_settings ( memory_settings)
271
- . with_enable_loser_tree ( enable_loser_tree) ;
253
+ let builder =
254
+ TransformSortBuilder :: new ( self . schema . clone ( ) , self . sort_desc . clone ( ) , max_block_size)
255
+ . with_spiller ( spiller)
256
+ . with_limit ( self . limit )
257
+ . with_order_column ( false , true )
258
+ . with_memory_settings ( memory_settings)
259
+ . with_enable_loser_tree ( enable_loser_tree) ;
272
260
273
261
pipeline. add_transform ( |input, output| {
274
262
Ok ( ProcessorPtr :: create ( builder. build_collect ( input, output) ?) )
275
263
} ) ?;
276
264
277
265
builder. add_bound_broadcast (
278
266
pipeline,
279
- builder. inner_schema ( ) ,
280
267
max_block_size,
281
268
self . ctx . clone ( ) ,
282
269
self . broadcast_id . unwrap ( ) ,
283
270
) ?;
284
271
285
- pipeline. exchange ( num_exec, Arc :: new ( SortRangeExchange ) ) ?;
286
-
287
- pipeline. add_transform ( |input, output| {
288
- Ok ( ProcessorPtr :: create ( builder. build_combine ( input, output) ?) )
289
- } ) ?;
290
-
291
272
pipeline. add_transform ( |input, output| {
292
273
Ok ( ProcessorPtr :: create ( builder. build_restore ( input, output) ?) )
293
274
} ) ?;
294
275
295
- add_range_shuffle_route ( pipeline) ?;
296
-
297
- if self . limit . is_none ( ) {
298
- return Ok ( ( ) ) ;
299
- }
300
-
301
- pipeline. add_transform ( |input, output| {
302
- TransformLimit :: try_create ( self . limit , 0 , input, output) . map ( ProcessorPtr :: create)
303
- } )
304
- }
305
-
306
- fn build_sample ( self , pipeline : & mut Pipeline ) -> Result < ( ) > {
307
- let settings = self . ctx . get_settings ( ) ;
308
- let max_block_size = settings. get_max_block_size ( ) ? as usize ;
309
-
310
- // Partial sort
311
- pipeline. add_transformer ( || {
312
- TransformSortPartial :: new (
313
- LimitType :: from_limit_rows ( self . limit ) ,
314
- self . sort_desc . clone ( ) ,
315
- )
316
- } ) ;
317
-
318
- let spiller = {
319
- let location_prefix = self . ctx . query_id_spill_prefix ( ) ;
320
- let config = SpillerConfig {
321
- spiller_type : SpillerType :: OrderBy ,
322
- location_prefix,
323
- disk_spill : None ,
324
- use_parquet : settings. get_spilling_file_format ( ) ?. is_parquet ( ) ,
325
- } ;
326
- let op = DataOperator :: instance ( ) . spill_operator ( ) ;
327
- Arc :: new ( Spiller :: create ( self . ctx . clone ( ) , op, config) ?)
328
- } ;
329
-
330
- let memory_settings = MemorySettings :: from_sort_settings ( & self . ctx ) ?;
331
- let enable_loser_tree = settings. get_enable_loser_tree_merge_sort ( ) ?;
332
-
333
- let builder = TransformSortBuilder :: new (
334
- self . schema . clone ( ) ,
335
- self . sort_desc . clone ( ) ,
336
- max_block_size,
337
- spiller,
338
- )
339
- . with_limit ( self . limit )
340
- . with_order_col_generated ( false )
341
- . with_output_order_col ( true )
342
- . with_memory_settings ( memory_settings)
343
- . with_enable_loser_tree ( enable_loser_tree) ;
344
-
345
- pipeline. add_transform ( |input, output| {
346
- Ok ( ProcessorPtr :: create ( builder. build_collect ( input, output) ?) )
347
- } ) ?;
348
-
349
- builder. add_bound_broadcast (
350
- pipeline,
351
- builder. inner_schema ( ) ,
352
- max_block_size,
353
- self . ctx . clone ( ) ,
354
- self . broadcast_id . unwrap ( ) ,
355
- ) ?;
356
-
357
276
pipeline. add_transform ( |input, output| {
358
- Ok ( ProcessorPtr :: create ( builder. build_restore ( input, output) ?) )
277
+ Ok ( ProcessorPtr :: create (
278
+ builder. build_bound_edge ( input, output) ?,
279
+ ) )
359
280
} ) ?;
360
281
361
- // pipeline.exchange_with_merge(
362
- // pipeline.output_len(),
363
- // Arc::new(SortBoundExchange {}),
364
- // |inputs, output| {
365
- // Ok(ProcessorPtr::create(create_multi_sort_processor(
366
- // inputs,
367
- // output,
368
- // self.schema.clone(),
369
- // self.block_size,
370
- // self.limit,
371
- // self.sort_desc.clone(),
372
- // self.remove_order_col_at_last,
373
- // self.enable_loser_tree,
374
- // )?))
375
- // },
376
- // )?;
377
-
378
282
Ok ( ( ) )
379
283
}
380
284
@@ -421,11 +325,10 @@ impl SortPipelineBuilder {
421
325
sort_merge_output_schema. clone ( ) ,
422
326
self . sort_desc . clone ( ) ,
423
327
self . block_size ,
424
- spiller. clone ( ) ,
425
328
)
329
+ . with_spiller ( spiller. clone ( ) )
426
330
. with_limit ( self . limit )
427
- . with_order_col_generated ( order_col_generated)
428
- . with_output_order_col ( output_order_col)
331
+ . with_order_column ( order_col_generated, output_order_col)
429
332
. with_memory_settings ( memory_settings. clone ( ) )
430
333
. with_enable_loser_tree ( enable_loser_tree) ;
431
334
@@ -476,28 +379,20 @@ impl SortPipelineBuilder {
476
379
}
477
380
}
478
381
479
- pub fn exchange_injector ( & self ) -> Arc < dyn ExchangeInjector > {
480
- Arc :: new ( SortInjector { } )
481
- }
482
-
483
382
pub fn build_bounded_merge_sort ( self , pipeline : & mut Pipeline ) -> Result < ( ) > {
383
+ let builder =
384
+ TransformSortBuilder :: new ( self . schema . clone ( ) , self . sort_desc . clone ( ) , self . block_size )
385
+ . with_limit ( self . limit )
386
+ . with_order_column ( true , self . remove_order_col_at_last )
387
+ . with_enable_loser_tree ( self . enable_loser_tree ) ;
388
+
484
389
let inputs_port: Vec < _ > = ( 0 ..pipeline. output_len ( ) )
485
390
. map ( |_| InputPort :: create ( ) )
486
391
. collect ( ) ;
487
392
let output_port = OutputPort :: create ( ) ;
488
393
489
394
let processor = ProcessorPtr :: create (
490
- BoundedMergeSortBuilder :: new (
491
- inputs_port. clone ( ) ,
492
- output_port. clone ( ) ,
493
- self . schema . clone ( ) ,
494
- self . sort_desc . clone ( ) ,
495
- self . block_size ,
496
- self . limit ,
497
- self . remove_order_col_at_last ,
498
- self . enable_loser_tree ,
499
- )
500
- . build ( ) ?,
395
+ builder. build_bounded_merge_sort ( inputs_port. clone ( ) , output_port. clone ( ) ) ?,
501
396
) ;
502
397
503
398
pipeline. add_pipe ( Pipe :: create ( inputs_port. len ( ) , 1 , vec ! [ PipeItem :: create(
0 commit comments