@@ -145,6 +145,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
145
145
// in singleton, the partition is 8, 32, 128.
146
146
// We pull the first data to ensure the max partition,
147
147
// and then pull all data that is less than the max partition
148
+ let mut refresh_index = 0 ;
148
149
for index in 0 ..self . inputs . len ( ) {
149
150
if self . inputs [ index] . port . is_finished ( ) {
150
151
continue ;
@@ -180,12 +181,21 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
180
181
self . initialized_all_inputs = false ;
181
182
}
182
183
183
- // handle the case where the last input changes the max partition
184
- if index == self . inputs . len ( ) - 1
185
- && before_max_partition_count > 0
184
+ // max partition count change
185
+ if before_max_partition_count > 0
186
186
&& before_max_partition_count != self . max_partition_count
187
187
{
188
- self . initialized_all_inputs = false ;
188
+ // set need data for inputs which is less than the max partition
189
+ for i in refresh_index..index {
190
+ if !self . inputs [ i] . port . is_finished ( )
191
+ && !self . inputs [ i] . port . has_data ( )
192
+ && self . inputs [ i] . max_partition_count != self . max_partition_count
193
+ {
194
+ self . inputs [ i] . port . set_need_data ( ) ;
195
+ self . initialized_all_inputs = false ;
196
+ }
197
+ }
198
+ refresh_index = index;
189
199
}
190
200
}
191
201
}
@@ -292,6 +302,12 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
292
302
}
293
303
294
304
if self . all_inputs_init {
305
+ if partition_count != self . max_partition_count {
306
+ return Err ( ErrorCode :: Internal (
307
+ "Internal, the partition count does not equal the max partition count on TransformPartitionBucket.
308
+ " ,
309
+ ) ) ;
310
+ }
295
311
match self . buckets_blocks . entry ( bucket) {
296
312
Entry :: Vacant ( v) => {
297
313
v. insert ( vec ! [ data_block] ) ;
0 commit comments