@@ -34,17 +34,18 @@ use databend_common_pipeline_transforms::MemorySettings;
34
34
use super :: collect:: TransformSortCollect ;
35
35
use super :: execute:: TransformSortExecute ;
36
36
use super :: merge_sort:: TransformSort ;
37
+ use super :: shuffle:: SortSampleState ;
38
+ use super :: shuffle:: TransformSortShuffle ;
37
39
use crate :: spillers:: Spiller ;
38
40
39
41
enum SortType {
40
42
Sort ,
41
43
Collect ,
42
44
Execute ,
45
+ Shuffle ,
43
46
}
44
47
45
48
pub struct TransformSortBuilder {
46
- input : Arc < InputPort > ,
47
- output : Arc < OutputPort > ,
48
49
schema : DataSchemaRef ,
49
50
block_size : usize ,
50
51
sort_desc : Arc < [ SortColumnDescription ] > ,
@@ -54,22 +55,16 @@ pub struct TransformSortBuilder {
54
55
spiller : Arc < Spiller > ,
55
56
enable_loser_tree : bool ,
56
57
limit : Option < usize > ,
57
- processor : Option < Result < Box < dyn Processor > > > ,
58
- typ : SortType ,
59
58
}
60
59
61
60
impl TransformSortBuilder {
62
61
pub fn create (
63
- input : Arc < InputPort > ,
64
- output : Arc < OutputPort > ,
65
62
schema : DataSchemaRef ,
66
63
sort_desc : Arc < [ SortColumnDescription ] > ,
67
64
block_size : usize ,
68
65
spiller : Arc < Spiller > ,
69
66
) -> Self {
70
- Self {
71
- input,
72
- output,
67
+ TransformSortBuilder {
73
68
block_size,
74
69
schema,
75
70
sort_desc,
@@ -79,8 +74,6 @@ impl TransformSortBuilder {
79
74
enable_loser_tree : false ,
80
75
limit : None ,
81
76
memory_settings : MemorySettings :: disable_spill ( ) ,
82
- processor : None ,
83
- typ : SortType :: Sort ,
84
77
}
85
78
}
86
79
@@ -109,34 +102,133 @@ impl TransformSortBuilder {
109
102
self
110
103
}
111
104
112
- pub fn build ( mut self ) -> Result < Box < dyn Processor > > {
113
- debug_assert ! ( if self . output_order_col {
105
+ pub fn build (
106
+ & self ,
107
+ input : Arc < InputPort > ,
108
+ output : Arc < OutputPort > ,
109
+ ) -> Result < Box < dyn Processor > > {
110
+ self . check ( ) ;
111
+
112
+ let mut build = Build {
113
+ params : self ,
114
+ input,
115
+ output,
116
+ processor : None ,
117
+ typ : SortType :: Sort ,
118
+ id : 0 ,
119
+ state : None ,
120
+ } ;
121
+
122
+ select_row_type ( & mut build) ;
123
+ build. processor . unwrap ( )
124
+ }
125
+
126
+ pub fn build_collect (
127
+ & self ,
128
+ input : Arc < InputPort > ,
129
+ output : Arc < OutputPort > ,
130
+ ) -> Result < Box < dyn Processor > > {
131
+ self . check ( ) ;
132
+
133
+ let mut build = Build {
134
+ params : self ,
135
+ input,
136
+ output,
137
+ processor : None ,
138
+ typ : SortType :: Collect ,
139
+ id : 0 ,
140
+ state : None ,
141
+ } ;
142
+
143
+ select_row_type ( & mut build) ;
144
+ build. processor . unwrap ( )
145
+ }
146
+
147
+ pub fn build_exec (
148
+ & self ,
149
+ input : Arc < InputPort > ,
150
+ output : Arc < OutputPort > ,
151
+ ) -> Result < Box < dyn Processor > > {
152
+ self . check ( ) ;
153
+
154
+ let mut build = Build {
155
+ params : self ,
156
+ input,
157
+ output,
158
+ processor : None ,
159
+ typ : SortType :: Execute ,
160
+ id : 0 ,
161
+ state : None ,
162
+ } ;
163
+
164
+ select_row_type ( & mut build) ;
165
+ build. processor . unwrap ( )
166
+ }
167
+
168
+ pub fn build_shuffle (
169
+ & self ,
170
+ input : Arc < InputPort > ,
171
+ output : Arc < OutputPort > ,
172
+ id : usize ,
173
+ state : Arc < SortSampleState > ,
174
+ ) -> Result < Box < dyn Processor > > {
175
+ self . check ( ) ;
176
+
177
+ let mut build = Build {
178
+ params : self ,
179
+ input,
180
+ output,
181
+ processor : None ,
182
+ typ : SortType :: Shuffle ,
183
+ id,
184
+ state : Some ( state) ,
185
+ } ;
186
+
187
+ select_row_type ( & mut build) ;
188
+ build. processor . unwrap ( )
189
+ }
190
+
191
+ fn should_use_sort_limit ( & self ) -> bool {
192
+ self . limit . map ( |limit| limit < 10000 ) . unwrap_or_default ( )
193
+ }
194
+
195
+ fn check ( & self ) {
196
+ assert ! ( if self . output_order_col {
114
197
self . schema. has_field( ORDER_COL_NAME )
115
198
} else {
116
199
!self . schema. has_field( ORDER_COL_NAME )
117
200
} ) ;
118
-
119
- select_row_type ( & mut self ) ;
120
- self . processor . unwrap ( )
121
201
}
202
+ }
122
203
204
+ pub struct Build < ' a > {
205
+ params : & ' a TransformSortBuilder ,
206
+ typ : SortType ,
207
+ input : Arc < InputPort > ,
208
+ output : Arc < OutputPort > ,
209
+ processor : Option < Result < Box < dyn Processor > > > ,
210
+ id : usize ,
211
+ state : Option < Arc < SortSampleState > > ,
212
+ }
213
+
214
+ impl Build < ' _ > {
123
215
fn build_sort < A , C > ( & mut self ) -> Result < Box < dyn Processor > >
124
216
where
125
217
A : SortAlgorithm + ' static ,
126
218
C : RowConverter < A :: Rows > + Send + ' static ,
127
219
{
128
- let schema = add_order_field ( self . schema . clone ( ) , & self . sort_desc ) ;
220
+ let schema = add_order_field ( self . params . schema . clone ( ) , & self . params . sort_desc ) ;
129
221
Ok ( Box :: new ( TransformSort :: < A , C > :: new (
130
222
self . input . clone ( ) ,
131
223
self . output . clone ( ) ,
132
224
schema,
133
- self . sort_desc . clone ( ) ,
134
- self . block_size ,
135
- self . limit . map ( |limit| ( limit, false ) ) ,
136
- self . spiller . clone ( ) ,
137
- self . output_order_col ,
138
- self . order_col_generated ,
139
- self . memory_settings . clone ( ) ,
225
+ self . params . sort_desc . clone ( ) ,
226
+ self . params . block_size ,
227
+ self . params . limit . map ( |limit| ( limit, false ) ) ,
228
+ self . params . spiller . clone ( ) ,
229
+ self . params . output_order_col ,
230
+ self . params . order_col_generated ,
231
+ self . params . memory_settings . clone ( ) ,
140
232
) ?) )
141
233
}
142
234
@@ -145,50 +237,38 @@ impl TransformSortBuilder {
145
237
A : SortAlgorithm + ' static ,
146
238
C : RowConverter < A :: Rows > + Send + ' static ,
147
239
{
148
- let schema = add_order_field ( self . schema . clone ( ) , & self . sort_desc ) ;
240
+ let schema = add_order_field ( self . params . schema . clone ( ) , & self . params . sort_desc ) ;
149
241
Ok ( Box :: new ( TransformSort :: < A , C > :: new (
150
242
self . input . clone ( ) ,
151
243
self . output . clone ( ) ,
152
244
schema,
153
- self . sort_desc . clone ( ) ,
154
- self . block_size ,
155
- Some ( ( self . limit . unwrap ( ) , true ) ) ,
156
- self . spiller . clone ( ) ,
157
- self . output_order_col ,
158
- self . order_col_generated ,
159
- self . memory_settings . clone ( ) ,
245
+ self . params . sort_desc . clone ( ) ,
246
+ self . params . block_size ,
247
+ Some ( ( self . params . limit . unwrap ( ) , true ) ) ,
248
+ self . params . spiller . clone ( ) ,
249
+ self . params . output_order_col ,
250
+ self . params . order_col_generated ,
251
+ self . params . memory_settings . clone ( ) ,
160
252
) ?) )
161
253
}
162
254
163
- pub fn build_collect ( mut self ) -> Result < Box < dyn Processor > > {
164
- debug_assert ! ( if self . output_order_col {
165
- self . schema. has_field( ORDER_COL_NAME )
166
- } else {
167
- !self . schema. has_field( ORDER_COL_NAME )
168
- } ) ;
169
- self . typ = SortType :: Collect ;
170
-
171
- select_row_type ( & mut self ) ;
172
- self . processor . unwrap ( )
173
- }
174
-
175
255
fn build_sort_collect < A , C > ( & mut self ) -> Result < Box < dyn Processor > >
176
256
where
177
257
A : SortAlgorithm + ' static ,
178
258
C : RowConverter < A :: Rows > + Send + ' static ,
179
259
{
180
- let schema = add_order_field ( self . schema . clone ( ) , & self . sort_desc ) ;
260
+ let schema = add_order_field ( self . params . schema . clone ( ) , & self . params . sort_desc ) ;
181
261
182
262
Ok ( Box :: new ( TransformSortCollect :: < A , C > :: new (
183
263
self . input . clone ( ) ,
184
264
self . output . clone ( ) ,
185
265
schema,
186
- self . sort_desc . clone ( ) ,
187
- self . block_size ,
188
- self . limit . map ( |limit| ( limit, false ) ) ,
189
- self . spiller . clone ( ) ,
190
- self . order_col_generated ,
191
- self . memory_settings . clone ( ) ,
266
+ self . params . sort_desc . clone ( ) ,
267
+ self . params . block_size ,
268
+ self . params . limit . map ( |limit| ( limit, false ) ) ,
269
+ self . params . spiller . clone ( ) ,
270
+ self . params . order_col_generated ,
271
+ self . params . memory_settings . clone ( ) ,
192
272
) ?) )
193
273
}
194
274
@@ -197,54 +277,53 @@ impl TransformSortBuilder {
197
277
A : SortAlgorithm + ' static ,
198
278
C : RowConverter < A :: Rows > + Send + ' static ,
199
279
{
200
- let schema = add_order_field ( self . schema . clone ( ) , & self . sort_desc ) ;
280
+ let schema = add_order_field ( self . params . schema . clone ( ) , & self . params . sort_desc ) ;
201
281
Ok ( Box :: new ( TransformSortCollect :: < A , C > :: new (
202
282
self . input . clone ( ) ,
203
283
self . output . clone ( ) ,
204
284
schema,
205
- self . sort_desc . clone ( ) ,
206
- self . block_size ,
207
- Some ( ( self . limit . unwrap ( ) , true ) ) ,
208
- self . spiller . clone ( ) ,
209
- self . order_col_generated ,
210
- self . memory_settings . clone ( ) ,
285
+ self . params . sort_desc . clone ( ) ,
286
+ self . params . block_size ,
287
+ Some ( ( self . params . limit . unwrap ( ) , true ) ) ,
288
+ self . params . spiller . clone ( ) ,
289
+ self . params . order_col_generated ,
290
+ self . params . memory_settings . clone ( ) ,
211
291
) ?) )
212
292
}
213
293
214
- pub fn build_exec ( mut self ) -> Result < Box < dyn Processor > > {
215
- debug_assert ! ( if self . output_order_col {
216
- self . schema. has_field( ORDER_COL_NAME )
217
- } else {
218
- !self . schema. has_field( ORDER_COL_NAME )
219
- } ) ;
220
- self . typ = SortType :: Execute ;
221
-
222
- select_row_type ( & mut self ) ;
223
- self . processor . unwrap ( )
224
- }
225
-
226
294
fn build_sort_exec < A > ( & mut self ) -> Result < Box < dyn Processor > >
227
295
where A : SortAlgorithm + ' static {
228
- let schema = add_order_field ( self . schema . clone ( ) , & self . sort_desc ) ;
296
+ let schema = add_order_field ( self . params . schema . clone ( ) , & self . params . sort_desc ) ;
229
297
230
298
Ok ( Box :: new ( TransformSortExecute :: < A > :: new (
231
299
self . input . clone ( ) ,
232
300
self . output . clone ( ) ,
233
301
schema,
234
- self . limit ,
235
- self . spiller . clone ( ) ,
236
- self . output_order_col ,
302
+ self . params . limit ,
303
+ self . params . spiller . clone ( ) ,
304
+ self . params . output_order_col ,
237
305
) ?) )
238
306
}
307
+
308
+ fn build_sort_shuffle < R > ( & mut self ) -> Result < Box < dyn Processor > >
309
+ where R : Rows + ' static {
310
+ Ok ( Box :: new ( TransformSortShuffle :: < R > :: new (
311
+ self . input . clone ( ) ,
312
+ self . output . clone ( ) ,
313
+ self . id ,
314
+ self . state . clone ( ) . unwrap ( ) ,
315
+ self . params . spiller . clone ( ) ,
316
+ ) ) )
317
+ }
239
318
}
240
319
241
- impl RowsTypeVisitor for TransformSortBuilder {
320
+ impl RowsTypeVisitor for Build < ' _ > {
242
321
fn schema ( & self ) -> DataSchemaRef {
243
- self . schema . clone ( )
322
+ self . params . schema . clone ( )
244
323
}
245
324
246
325
fn sort_desc ( & self ) -> & [ SortColumnDescription ] {
247
- & self . sort_desc
326
+ & self . params . sort_desc
248
327
}
249
328
250
329
fn visit_type < R , C > ( & mut self )
@@ -254,27 +333,28 @@ impl RowsTypeVisitor for TransformSortBuilder {
254
333
{
255
334
let processor = match self . typ {
256
335
SortType :: Sort => match (
257
- self . limit . map ( |limit| limit < 10000 ) . unwrap_or_default ( ) ,
258
- self . enable_loser_tree ,
336
+ self . params . should_use_sort_limit ( ) ,
337
+ self . params . enable_loser_tree ,
259
338
) {
260
339
( true , true ) => self . build_sort_limit :: < LoserTreeSort < R > , C > ( ) ,
261
340
( true , false ) => self . build_sort_limit :: < HeapSort < R > , C > ( ) ,
262
341
( false , true ) => self . build_sort :: < LoserTreeSort < R > , C > ( ) ,
263
342
( false , false ) => self . build_sort :: < HeapSort < R > , C > ( ) ,
264
343
} ,
265
344
SortType :: Collect => match (
266
- self . limit . map ( |limit| limit < 10000 ) . unwrap_or_default ( ) ,
267
- self . enable_loser_tree ,
345
+ self . params . should_use_sort_limit ( ) ,
346
+ self . params . enable_loser_tree ,
268
347
) {
269
348
( true , true ) => self . build_sort_limit_collect :: < LoserTreeSort < R > , C > ( ) ,
270
349
( true , false ) => self . build_sort_limit_collect :: < HeapSort < R > , C > ( ) ,
271
350
( false , true ) => self . build_sort_collect :: < LoserTreeSort < R > , C > ( ) ,
272
351
( false , false ) => self . build_sort_collect :: < HeapSort < R > , C > ( ) ,
273
352
} ,
274
- SortType :: Execute => match self . enable_loser_tree {
353
+ SortType :: Execute => match self . params . enable_loser_tree {
275
354
true => self . build_sort_exec :: < LoserTreeSort < R > > ( ) ,
276
355
false => self . build_sort_exec :: < HeapSort < R > > ( ) ,
277
356
} ,
357
+ SortType :: Shuffle => self . build_sort_shuffle :: < R > ( ) ,
278
358
} ;
279
359
self . processor = Some ( processor)
280
360
}
0 commit comments