1
1
use std:: {
2
- cmp:: { max, Ordering } ,
2
+ cmp:: { max, min , Ordering } ,
3
3
collections:: { binary_heap:: PeekMut , BinaryHeap } ,
4
4
mem,
5
5
} ;
@@ -19,6 +19,7 @@ pub(crate) struct Assembler {
19
19
/// length of the contiguous prefix of the stream which has been consumed by the application,
20
20
/// aka the stream offset.
21
21
bytes_read : u64 ,
22
+ end : u64 ,
22
23
}
23
24
24
25
impl Assembler {
@@ -175,6 +176,7 @@ impl Assembler {
175
176
allocation_size,
176
177
bytes. len( )
177
178
) ;
179
+ self . end = max ( self . end , offset + bytes. len ( ) as u64 ) ;
178
180
if let State :: Unordered { ref mut recvd } = self . state {
179
181
// Discard duplicate data
180
182
for duplicate in recvd. replace ( offset..offset + bytes. len ( ) as u64 ) {
@@ -216,9 +218,13 @@ impl Assembler {
216
218
// of memory allocated. This limits over-allocation in proportion to the
217
219
// buffered data. The constants are chosen somewhat arbitrarily and try to
218
220
// balance between defragmentation overhead and over-allocation.
219
- let over_allocation = self . allocated - self . buffered ;
220
- let threshold = max ( self . buffered * 3 / 2 , 32 * 1024 ) ;
221
+ let buffered = min ( self . buffered , ( self . end - self . bytes_read ) as usize ) ;
222
+ let over_allocation = self . allocated - buffered;
223
+ let threshold = max ( buffered * 3 / 2 , 32 * 1024 ) ;
221
224
if over_allocation > threshold {
225
+ if self . state . is_ordered ( ) && self . buffered > buffered {
226
+ self . deduplicate ( ) ;
227
+ }
222
228
self . defragment ( )
223
229
}
224
230
}
0 commit comments