@@ -30,7 +30,7 @@ use databend_common_pipeline_transforms::sort::Merger;
30
30
use databend_common_pipeline_transforms:: sort:: Rows ;
31
31
use databend_common_pipeline_transforms:: sort:: SortedStream ;
32
32
33
- use crate :: pipelines :: processors :: transforms :: SortBound ;
33
+ use super :: SortBound ;
34
34
35
35
type Stream < A > = BoundedInputStream < <A as SortAlgorithm >:: Rows > ;
36
36
@@ -44,7 +44,7 @@ where A: SortAlgorithm
44
44
limit : Option < usize > ,
45
45
46
46
output_data : Option < DataBlock > ,
47
- bound_index : u32 ,
47
+ cur_index : u32 ,
48
48
inner : std:: result:: Result < Merger < A , Stream < A > > , Vec < Stream < A > > > ,
49
49
}
50
50
@@ -78,7 +78,7 @@ where A: SortAlgorithm
78
78
block_size,
79
79
limit,
80
80
output_data : None ,
81
- bound_index : 0 ,
81
+ cur_index : 0 ,
82
82
inner : Err ( streams) ,
83
83
} )
84
84
}
@@ -119,8 +119,8 @@ where A: SortAlgorithm + 'static
119
119
let merger = self . inner . as_mut ( ) . ok ( ) . unwrap ( ) ;
120
120
if let Some ( block) = merger. next_block ( ) ? {
121
121
self . output_data = Some ( block. add_meta ( Some ( SortBound :: create (
122
- self . bound_index ,
123
- ( !merger. is_finished ( ) ) . then_some ( self . bound_index ) ,
122
+ self . cur_index ,
123
+ ( !merger. is_finished ( ) ) . then_some ( self . cur_index ) ,
124
124
) ) ) ?) ;
125
125
} ;
126
126
Ok ( ( ) )
@@ -137,7 +137,7 @@ where A: SortAlgorithm + 'static
137
137
if !merger. is_finished ( ) {
138
138
return Ok ( Event :: Sync ) ;
139
139
}
140
- self . bound_index += 1 ;
140
+ self . cur_index += 1 ;
141
141
let merger = std:: mem:: replace ( inner, Err ( vec ! [ ] ) ) . ok ( ) . unwrap ( ) ;
142
142
self . inner = Err ( merger. streams ( ) ) ;
143
143
self . inner . as_mut ( ) . err ( ) . unwrap ( )
@@ -151,7 +151,7 @@ where A: SortAlgorithm + 'static
151
151
}
152
152
153
153
for stream in streams. iter_mut ( ) {
154
- stream. update_bound_index ( self . bound_index ) ;
154
+ stream. update_bound_index ( self . cur_index ) ;
155
155
}
156
156
157
157
self . inner = Ok ( Merger :: create (
@@ -233,14 +233,17 @@ impl<R: Rows> BoundedInputStream<R> {
233
233
. expect ( "require a SortBound" ) ;
234
234
235
235
let bound = self . bound . as_mut ( ) . unwrap ( ) ;
236
+ assert ! (
237
+ meta. index >= bound. bound_index,
238
+ "meta: {meta:?}, bound: {bound:?}" ,
239
+ ) ;
236
240
if meta. index == bound. bound_index {
237
241
bound. more = meta. next . is_some_and ( |next| next == meta. index ) ;
238
242
self . data . take ( ) . map ( |mut data| {
239
243
data. take_meta ( ) . unwrap ( ) ;
240
244
data
241
245
} )
242
246
} else {
243
- assert ! ( meta. index > bound. bound_index) ;
244
247
None
245
248
}
246
249
}
@@ -252,3 +255,103 @@ impl<R: Rows> BoundedInputStream<R> {
252
255
} ) ;
253
256
}
254
257
}
258
+
259
+ #[ cfg( test) ]
260
+ mod tests {
261
+ use std:: sync:: Arc ;
262
+
263
+ use databend_common_expression:: types:: Int32Type ;
264
+ use databend_common_expression:: FromData ;
265
+ use databend_common_pipeline_core:: processors:: connect;
266
+ use databend_common_pipeline_transforms:: sort:: SimpleRowsAsc ;
267
+
268
+ use super :: * ;
269
+
270
+ fn create_block ( empty : bool , index : u32 , next : Option < u32 > ) -> DataBlock {
271
+ let block = DataBlock :: new_from_columns ( vec ! [ Int32Type :: from_data( vec![ 1 , 2 , 3 ] ) ] ) ;
272
+ let block = if empty { block. slice ( 0 ..0 ) } else { block } ;
273
+ block
274
+ . add_meta ( Some ( SortBound :: create ( index, next) ) )
275
+ . unwrap ( )
276
+ }
277
+
278
+ fn create_stream ( ) -> (
279
+ BoundedInputStream < SimpleRowsAsc < Int32Type > > ,
280
+ Arc < OutputPort > ,
281
+ ) {
282
+ let output = OutputPort :: create ( ) ;
283
+ let input = InputPort :: create ( ) ;
284
+ unsafe {
285
+ connect ( & input, & output) ;
286
+ }
287
+
288
+ let stream = BoundedInputStream {
289
+ data : None ,
290
+ input,
291
+ remove_order_col : false ,
292
+ sort_row_offset : 0 ,
293
+ bound : None ,
294
+ _r : PhantomData ,
295
+ } ;
296
+ ( stream, output)
297
+ }
298
+
299
+ #[ test]
300
+ fn test_bounded_input_stream ( ) {
301
+ let ( mut stream, output) = create_stream ( ) ;
302
+
303
+ stream. update_bound_index ( 0 ) ;
304
+
305
+ {
306
+ let ( _, pending) = stream. next ( ) . unwrap ( ) ;
307
+
308
+ assert ! ( stream. bound. unwrap( ) . more) ;
309
+ assert ! ( pending) ;
310
+ }
311
+
312
+ {
313
+ let block = create_block ( true , 0 , Some ( 0 ) ) ;
314
+ output. push_data ( Ok ( block) ) ;
315
+
316
+ let ( _, pending) = stream. next ( ) . unwrap ( ) ;
317
+
318
+ assert ! ( stream. bound. unwrap( ) . more) ;
319
+ assert ! ( pending) ;
320
+ }
321
+
322
+ {
323
+ let block = create_block ( false , 0 , Some ( 0 ) ) ;
324
+ output. push_data ( Ok ( block) ) ;
325
+
326
+ let ( data, pending) = stream. next ( ) . unwrap ( ) ;
327
+ assert ! ( !pending) ;
328
+ let data = data. unwrap ( ) ;
329
+ assert ! ( data. 0 . get_meta( ) . is_none( ) ) ;
330
+ assert_eq ! ( data. 1 . len( ) , 3 ) ;
331
+ }
332
+
333
+ {
334
+ let block = create_block ( true , 0 , Some ( 1 ) ) ;
335
+ output. push_data ( Ok ( block) ) ;
336
+
337
+ let ( data, pending) = stream. next ( ) . unwrap ( ) ;
338
+
339
+ assert ! ( data. is_none( ) ) ;
340
+ assert ! ( !stream. bound. unwrap( ) . more) ;
341
+ assert ! ( pending) ;
342
+
343
+ let block = create_block ( false , 1 , Some ( 1 ) ) ;
344
+ output. push_data ( Ok ( block) ) ;
345
+
346
+ let ( data, pending) = stream. next ( ) . unwrap ( ) ;
347
+ assert ! ( data. is_none( ) ) ;
348
+ assert ! ( !stream. bound. unwrap( ) . more) ;
349
+ assert ! ( !pending) ;
350
+
351
+ let ( data, pending) = stream. next ( ) . unwrap ( ) ;
352
+ assert ! ( data. is_none( ) ) ;
353
+ assert ! ( !stream. bound. unwrap( ) . more) ;
354
+ assert ! ( !pending) ;
355
+ }
356
+ }
357
+ }
0 commit comments