File tree Expand file tree Collapse file tree 3 files changed +10
-5
lines changed
service/src/pipelines/processors/transforms/window/partition
storages/fuse/src/io/write/stream Expand file tree Collapse file tree 3 files changed +10
-5
lines changed Original file line number Diff line number Diff line change @@ -211,7 +211,8 @@ impl Processor for TransformHilbertCollect {
211
211
}
212
212
State :: Flush => {
213
213
if let Some ( ( partition_id, data_block) ) = self . immediate_output_blocks . pop ( ) {
214
- let mut restored_data_blocks = self . buffer . restore_by_id ( partition_id) . await ?;
214
+ let mut restored_data_blocks =
215
+ self . buffer . restore_by_id ( partition_id, true ) . await ?;
215
216
restored_data_blocks. push ( data_block) ;
216
217
self . state = State :: Concat ( restored_data_blocks) ;
217
218
}
Original file line number Diff line number Diff line change @@ -145,15 +145,19 @@ impl WindowPartitionBuffer {
145
145
while self . next_to_restore_partition_id + 1 < self . num_partitions as isize {
146
146
self . next_to_restore_partition_id += 1 ;
147
147
let partition_id = self . next_to_restore_partition_id as usize ;
148
- let result = self . restore_by_id ( partition_id) . await ?;
148
+ let result = self . restore_by_id ( partition_id, false ) . await ?;
149
149
if !result. is_empty ( ) {
150
150
return Ok ( result) ;
151
151
}
152
152
}
153
153
Ok ( vec ! [ ] )
154
154
}
155
155
156
- pub async fn restore_by_id ( & mut self , partition_id : usize ) -> Result < Vec < DataBlock > > {
156
+ pub async fn restore_by_id (
157
+ & mut self ,
158
+ partition_id : usize ,
159
+ partial_restore : bool ,
160
+ ) -> Result < Vec < DataBlock > > {
157
161
// Restore large partitions from spilled files.
158
162
let mut result = self . spiller . take_spilled_partition ( & partition_id) . await ?;
159
163
@@ -171,7 +175,7 @@ impl WindowPartitionBuffer {
171
175
location,
172
176
partitions,
173
177
} = merged_partitions;
174
- if out_of_memory_limit || * partial_restored {
178
+ if out_of_memory_limit || * partial_restored || partial_restore {
175
179
if let Some ( pos) = partitions. iter ( ) . position ( |( id, _) | * id == partition_id) {
176
180
let data_block = self
177
181
. spiller
Original file line number Diff line number Diff line change @@ -242,7 +242,7 @@ impl StreamBlockBuilder {
242
242
self . row_count >= self . properties . block_thresholds . min_rows_per_block
243
243
|| self . block_size >= self . properties . block_thresholds . max_bytes_per_block
244
244
|| ( file_size >= self . properties . block_thresholds . min_compressed_per_block
245
- && self . block_size >= self . properties . block_thresholds . min_bytes_per_block )
245
+ && self . block_size >= self . properties . block_thresholds . min_bytes_per_block )
246
246
}
247
247
248
248
pub fn write ( & mut self , block : DataBlock ) -> Result < ( ) > {
You can’t perform that action at this time.
0 commit comments