@@ -51,6 +51,7 @@ pub struct TransformSortCollect<A: SortAlgorithm, C> {
51
51
output : Arc < OutputPort > ,
52
52
output_data : Option < DataBlock > ,
53
53
54
+ max_block_size : usize ,
54
55
row_converter : C ,
55
56
sort_desc : Arc < [ SortColumnDescription ] > ,
56
57
/// If this transform is after an Exchange transform,
@@ -102,6 +103,7 @@ where
102
103
inner,
103
104
aborting : AtomicBool :: new ( false ) ,
104
105
memory_settings,
106
+ max_block_size,
105
107
} )
106
108
}
107
109
@@ -117,12 +119,19 @@ where
117
119
Ok ( ( rows, block) )
118
120
}
119
121
120
- fn limit_trans_to_spill ( & mut self ) -> Result < ( ) > {
122
+ fn limit_trans_to_spill ( & mut self , no_spill : bool ) -> Result < ( ) > {
121
123
let Inner :: Limit ( merger) = & self . inner else {
122
124
unreachable ! ( )
123
125
} ;
124
126
assert ! ( merger. num_rows( ) > 0 ) ;
125
- let params = self . determine_params ( merger. num_bytes ( ) , merger. num_rows ( ) ) ;
127
+ let params = if no_spill {
128
+ SortSpillParams {
129
+ batch_rows : self . max_block_size ,
130
+ num_merge : merger. num_rows ( ) . div_ceil ( self . max_block_size ) ,
131
+ }
132
+ } else {
133
+ self . determine_params ( merger. num_bytes ( ) , merger. num_rows ( ) )
134
+ } ;
126
135
let Inner :: Limit ( merger) = & mut self . inner else {
127
136
unreachable ! ( )
128
137
} ;
@@ -132,25 +141,32 @@ where
132
141
Ok ( ( ) )
133
142
}
134
143
135
- fn collect_trans_to_spill ( & mut self , input_data : Vec < DataBlock > ) {
144
+ fn collect_trans_to_spill ( & mut self , input_data : Vec < DataBlock > , no_spill : bool ) {
136
145
let ( num_rows, num_bytes) = input_data
137
146
. iter ( )
138
147
. map ( |block| ( block. num_rows ( ) , block. memory_size ( ) ) )
139
148
. fold ( ( 0 , 0 ) , |( acc_rows, acc_bytes) , ( rows, bytes) | {
140
149
( acc_rows + rows, acc_bytes + bytes)
141
150
} ) ;
142
151
assert ! ( num_rows > 0 ) ;
143
- let params = self . determine_params ( num_bytes, num_rows) ;
152
+ let params = if no_spill {
153
+ SortSpillParams {
154
+ batch_rows : self . max_block_size ,
155
+ num_merge : num_rows. div_ceil ( self . max_block_size ) ,
156
+ }
157
+ } else {
158
+ self . determine_params ( num_bytes, num_rows)
159
+ } ;
144
160
let spill_sort = SortSpill :: new ( self . base . clone ( ) , params) ;
145
161
self . inner = Inner :: Spill ( input_data, spill_sort) ;
146
162
}
147
163
148
- fn trans_to_spill ( & mut self ) -> Result < ( ) > {
164
+ fn trans_to_spill ( & mut self , no_spill : bool ) -> Result < ( ) > {
149
165
match & mut self . inner {
150
- Inner :: Limit ( _) => self . limit_trans_to_spill ( ) ,
166
+ Inner :: Limit ( _) => self . limit_trans_to_spill ( no_spill ) ,
151
167
Inner :: Collect ( input_data) => {
152
168
let input_data = std:: mem:: take ( input_data) ;
153
- self . collect_trans_to_spill ( input_data) ;
169
+ self . collect_trans_to_spill ( input_data, no_spill ) ;
154
170
Ok ( ( ) )
155
171
}
156
172
Inner :: Spill ( _, _) => Ok ( ( ) ) ,
@@ -322,18 +338,19 @@ where
322
338
#[ async_backtrace:: framed]
323
339
async fn async_process ( & mut self ) -> Result < ( ) > {
324
340
let finished = self . input . is_finished ( ) ;
325
- self . trans_to_spill ( ) ?;
341
+ self . trans_to_spill ( finished ) ?;
326
342
327
- let input = self . input_rows ( ) ;
328
343
let Inner :: Spill ( input_data, spill_sort) = & mut self . inner else {
329
344
unreachable ! ( )
330
345
} ;
346
+
347
+ let input = input_data. in_memory_rows ( ) ;
331
348
let memory_rows = spill_sort. collect_memory_rows ( ) ;
332
349
let max = spill_sort. max_rows ( ) ;
333
350
334
351
if memory_rows > 0 && memory_rows + input > max {
335
352
spill_sort
336
- . subsequent_spill_last ( memory_rows + input - max)
353
+ . collect_spill_last ( memory_rows + input - max)
337
354
. await ?;
338
355
}
339
356
if input > max || finished && input > 0 {
0 commit comments