@@ -41,7 +41,6 @@ where A: SortAlgorithm
41
41
output : Arc < OutputPort > ,
42
42
schema : DataSchemaRef ,
43
43
block_size : usize ,
44
- limit : Option < usize > ,
45
44
46
45
output_data : Option < DataBlock > ,
47
46
cur_index : u32 ,
@@ -56,7 +55,6 @@ where A: SortAlgorithm
56
55
output : Arc < OutputPort > ,
57
56
schema : DataSchemaRef ,
58
57
block_size : usize ,
59
- limit : Option < usize > ,
60
58
remove_order_col : bool ,
61
59
) -> Result < Self > {
62
60
let streams = inputs
@@ -76,7 +74,6 @@ where A: SortAlgorithm
76
74
output,
77
75
schema,
78
76
block_size,
79
- limit,
80
77
output_data : None ,
81
78
cur_index : 0 ,
82
79
inner : Err ( streams) ,
@@ -145,11 +142,12 @@ where A: SortAlgorithm + 'static
145
142
Err ( streams) => streams,
146
143
} ;
147
144
148
- if streams. iter ( ) . all ( |stream| stream. input . is_finished ( ) ) {
145
+ if streams. iter ( ) . all ( |stream| stream. is_finished ( ) ) {
149
146
self . output . finish ( ) ;
150
147
return Ok ( Event :: Finished ) ;
151
148
}
152
149
150
+ log:: debug!( "create merger cur_index {}" , self . cur_index) ;
153
151
for stream in streams. iter_mut ( ) {
154
152
stream. update_bound_index ( self . cur_index ) ;
155
153
}
@@ -158,7 +156,7 @@ where A: SortAlgorithm + 'static
158
156
self . schema . clone ( ) ,
159
157
std:: mem:: take ( streams) ,
160
158
self . block_size ,
161
- self . limit ,
159
+ None ,
162
160
) ) ;
163
161
Ok ( Event :: Sync )
164
162
}
@@ -211,8 +209,7 @@ impl<R: Rows> BoundedInputStream<R> {
211
209
return Ok ( false ) ;
212
210
}
213
211
214
- if self . input . has_data ( ) {
215
- let block = self . input . pull_data ( ) . unwrap ( ) ?;
212
+ if let Some ( block) = self . input . pull_data ( ) . transpose ( ) ? {
216
213
self . input . set_need_data ( ) ;
217
214
self . data = Some ( block) ;
218
215
Ok ( false )
@@ -254,6 +251,10 @@ impl<R: Rows> BoundedInputStream<R> {
254
251
more : true ,
255
252
} ) ;
256
253
}
254
+
255
+ fn is_finished ( & self ) -> bool {
256
+ self . input . is_finished ( ) && self . data . is_none ( )
257
+ }
257
258
}
258
259
259
260
#[ cfg( test) ]
0 commit comments