Skip to content

Commit 48f11f7

Browse files
Respect singleton counts in merge effort (#614)
1 parent 3418974 commit 48f11f7

File tree

1 file changed

+8
-8
lines changed
  • differential-dataflow/src/trace/implementations

1 file changed

+8
-8
lines changed

differential-dataflow/src/trace/implementations/ord_neu.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -270,27 +270,27 @@ pub mod val_batch {
270270
fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
271271

272272
// An (incomplete) indication of the amount of work we've done so far.
273-
let starting_updates = self.result.times.len();
273+
let starting_updates = self.result.times.len() + self.singletons;
274274
let mut effort = 0isize;
275275

276276
// While both mergees are still active, perform single-key merges.
277277
while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
278278
self.merge_key(&source1.storage, &source2.storage);
279279
// An (incomplete) accounting of the work we've done.
280-
effort = (self.result.times.len() - starting_updates) as isize;
280+
effort = (self.result.times.len() + self.singletons - starting_updates) as isize;
281281
}
282282

283283
// Merging is complete, and only copying remains.
284284
// Key-by-key copying allows effort interruption, and compaction.
285285
while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
286286
self.copy_key(&source1.storage, self.key_cursor1);
287287
self.key_cursor1 += 1;
288-
effort = (self.result.times.len() - starting_updates) as isize;
288+
effort = (self.result.times.len() + self.singletons - starting_updates) as isize;
289289
}
290290
while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
291291
self.copy_key(&source2.storage, self.key_cursor2);
292292
self.key_cursor2 += 1;
293-
effort = (self.result.times.len() - starting_updates) as isize;
293+
effort = (self.result.times.len() + self.singletons - starting_updates) as isize;
294294
}
295295

296296
*fuel -= effort;
@@ -864,27 +864,27 @@ pub mod key_batch {
864864
fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
865865

866866
// An (incomplete) indication of the amount of work we've done so far.
867-
let starting_updates = self.result.times.len();
867+
let starting_updates = self.result.times.len() + self.singletons;
868868
let mut effort = 0isize;
869869

870870
// While both mergees are still active, perform single-key merges.
871871
while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
872872
self.merge_key(&source1.storage, &source2.storage);
873873
// An (incomplete) accounting of the work we've done.
874-
effort = (self.result.times.len() - starting_updates) as isize;
874+
effort = (self.result.times.len() + self.singletons - starting_updates) as isize;
875875
}
876876

877877
// Merging is complete, and only copying remains.
878878
// Key-by-key copying allows effort interruption, and compaction.
879879
while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
880880
self.copy_key(&source1.storage, self.key_cursor1);
881881
self.key_cursor1 += 1;
882-
effort = (self.result.times.len() - starting_updates) as isize;
882+
effort = (self.result.times.len() + self.singletons - starting_updates) as isize;
883883
}
884884
while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
885885
self.copy_key(&source2.storage, self.key_cursor2);
886886
self.key_cursor2 += 1;
887-
effort = (self.result.times.len() - starting_updates) as isize;
887+
effort = (self.result.times.len() + self.singletons - starting_updates) as isize;
888888
}
889889

890890
*fuel -= effort;

0 commit comments

Comments
 (0)