Skip to content

Commit d3bf8c2

Browse files
rebase on master (TimelyDataflow#312)
1 parent afa4413 commit d3bf8c2

File tree

11 files changed

+12
-12
lines changed

11 files changed

+12
-12
lines changed

src/operators/arrange/agent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ where
7474
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
7575
self.trace.borrow_mut().trace.cursor_through(frontier)
7676
}
77-
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F) { self.trace.borrow_mut().trace.map_batches(f) }
77+
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
7878
}
7979

8080
impl<Tr> TraceAgent<Tr>

src/trace/implementations/graph.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ where
6060
}
6161
fn get_physical_compaction(&mut self) -> &[Product<RootTimestamp,()>] { &self.spine.get_physical_compaction() }
6262

63-
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F) {
63+
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
6464
self.spine.map_batches(f)
6565
}
6666
}

src/trace/implementations/spine_fueled.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ where
215215
}
216216
fn get_physical_compaction(&mut self) -> &[T] { &self.physical_frontier[..] }
217217

218-
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, mut f: F) {
218+
fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
219219
for batch in self.merging.iter().rev() {
220220
match *batch {
221221
Some(MergeState::Merging(ref batch1, ref batch2, _, _)) => { f(batch1); f(batch2); },

src/trace/implementations/spine_fueled_neu.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ where
224224
}
225225
fn get_physical_compaction(&mut self) -> AntichainRef<T> { self.physical_frontier.borrow() }
226226

227-
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, mut f: F) {
227+
fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
228228
for batch in self.merging.iter().rev() {
229229
match batch {
230230
MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { f(batch1); f(batch2); },

src/trace/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ pub trait TraceReader {
154154
/// This is currently used only to extract historical data to prime late-starting operators who want to reproduce
155155
/// the stream of batches moving past the trace. It could also be a fine basis for a default implementation of the
156156
/// cursor methods, as they (by default) just move through batches accumulating cursors into a cursor list.
157-
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F);
157+
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
158158

159159
/// Reads the upper frontier of committed times.
160160
///

src/trace/wrappers/enter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ where
5050
type Batch = BatchEnter<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Batch, TInner>;
5151
type Cursor = CursorEnter<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Cursor, TInner>;
5252

53-
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, mut f: F) {
53+
fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
5454
self.trace.map_batches(|batch| {
5555
f(&Self::Batch::make_from(batch.clone()));
5656
})

src/trace/wrappers/enter_at.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ where
6565
type Batch = BatchEnter<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Batch, TInner,F>;
6666
type Cursor = CursorEnter<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Cursor, TInner,F>;
6767

68-
fn map_batches<F2: FnMut(&Self::Batch)>(&mut self, mut f: F2) {
68+
fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
6969
let logic = self.logic.clone();
7070
self.trace.map_batches(|batch| {
7171
f(&Self::Batch::make_from(batch.clone(), logic.clone()));

src/trace/wrappers/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ where
4343
type Batch = BatchFilter<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Batch, F>;
4444
type Cursor = CursorFilter<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Cursor, F>;
4545

46-
fn map_batches<F2: FnMut(&Self::Batch)>(&mut self, mut f: F2) {
46+
fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
4747
let logic = self.logic.clone();
4848
self.trace
4949
.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone())))

src/trace/wrappers/freeze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ where
9696
type Batch = BatchFreeze<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Batch, F>;
9797
type Cursor = CursorFreeze<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Cursor, F>;
9898

99-
fn map_batches<F2: FnMut(&Self::Batch)>(&mut self, mut f: F2) {
99+
fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
100100
let func = &self.func;
101101
self.trace.map_batches(|batch| {
102102
f(&Self::Batch::make_from(batch.clone(), func.clone()));

src/trace/wrappers/frontier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ where
5050
type Batch = BatchFrontier<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Batch>;
5151
type Cursor = CursorFrontier<Tr::Key, Tr::Val, Tr::Time, Tr::R, Tr::Cursor>;
5252

53-
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, mut f: F) {
53+
fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
5454
let frontier = self.frontier.borrow();
5555
self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), frontier)))
5656
}

0 commit comments

Comments
 (0)