From 66640e907d3b1499897eef2a951a8fad0b98bde9 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 16 May 2024 18:46:15 -0400 Subject: [PATCH] Consolidate as part of TraceFrontier map_times --- src/trace/wrappers/frontier.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index 02cce0dd1..ddbed73ed 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -109,18 +109,20 @@ impl BatchFrontier { } /// Wrapper to provide cursor to nested scope. -pub struct CursorFrontier { +pub struct CursorFrontier { cursor: C, since: Antichain, - until: Antichain + until: Antichain, + buffer: Vec<(C::Time, C::Diff)>, } -impl CursorFrontier where T: Clone { +impl CursorFrontier where T: Clone { fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { CursorFrontier { cursor, since: since.to_owned(), until: until.to_owned(), + buffer: Vec::new(), } } } @@ -149,9 +151,13 @@ impl Cursor for CursorFrontier { temp.clone_from(time); temp.advance_by(since); if !until.less_equal(&temp) { - logic(&temp, diff); + self.buffer.push((temp.clone(), diff.clone())); } - }) + }); + crate::consolidation::consolidate(&mut self.buffer); + for (time, diff) in self.buffer.drain(..) { + logic(&time, &diff); + } } #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }