98
98
//! }).unwrap();
99
99
//! ```
100
100
101
- use std:: collections:: { BinaryHeap , HashMap } ;
101
+ use std:: collections:: { BinaryHeap , BTreeMap } ;
102
102
103
103
use timely:: order:: { PartialOrder , TotalOrder } ;
104
104
use timely:: dataflow:: { Scope , Stream } ;
@@ -216,7 +216,7 @@ where
216
216
}
217
217
218
218
// Extract upserts available to process as of this `upper`.
219
- let mut to_process = HashMap :: new ( ) ;
219
+ let mut to_process = BTreeMap :: new ( ) ;
220
220
while priority_queue. peek ( ) . map ( |std:: cmp:: Reverse ( ( t, _k, _v) ) | !upper. less_equal ( t) ) . unwrap_or ( false ) {
221
221
let std:: cmp:: Reverse ( ( time, key, val) ) = priority_queue. pop ( ) . expect ( "Priority queue just ensured non-empty" ) ;
222
222
to_process. entry ( key) . or_insert ( Vec :: new ( ) ) . push ( ( time, std:: cmp:: Reverse ( val) ) ) ;
@@ -229,15 +229,11 @@ where
229
229
priority_queue. shrink_to_fit ( ) ;
230
230
}
231
231
232
- // Put (key, list) into key order, to match cursor enumeration.
233
- let mut to_process = to_process. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
234
- to_process. sort ( ) ;
235
-
236
232
// Prepare a cursor to the existing arrangement, and a batch builder for
237
233
// new stuff that we add.
238
234
let ( mut trace_cursor, trace_storage) = reader_local. cursor ( ) ;
239
235
let mut builder = Bu :: new ( ) ;
240
- for ( key, mut list) in to_process. drain ( .. ) {
236
+ for ( key, mut list) in to_process {
241
237
242
238
// The prior value associated with the key.
243
239
let mut prev_value: Option < V > = None ;
0 commit comments