From 0e0493a379bf5e632189c096c8583a7de598eff7 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 5 Jun 2023 17:44:54 +0200 Subject: [PATCH] join: simplify logic passed to `Deferred::work` Previously, the join code had to wrap the `result` closure into another closure to invoke `Deferred::work`, to deal with the fact that matches from the `todo1` list had their value and diff fields swapped. Instead of having a closure to unswap the fields, we can simply pass them to the `Deferred` constructor in the correct order instead. That is, the `Deferred` constructor should always receive the cursor/storage from input 1 first and from input 2 second. If we make this change, then a) no swapping closure is necessary, and b) the input values match the types (`C1`, `C2`) of the `Deferred` struct. --- src/operators/join.rs | 52 +++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/operators/join.rs b/src/operators/join.rs index 8710ac07b..1d82671b7 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -499,7 +499,7 @@ impl JoinCore for Arranged // at start-up, and have held back physical compaction ever since. let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap(); let batch1_cursor = batch1.cursor(); - todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone())); + todo1.push_back(Deferred::new(batch1_cursor, batch1.clone(), trace2_cursor, trace2_storage, capability.clone())); } // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we @@ -564,7 +564,7 @@ impl JoinCore for Arranged while !todo1.is_empty() && fuel > 0 { todo1.front_mut().unwrap().work( output, - |k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2), + &mut result, &mut fuel ); if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } @@ -575,7 +575,7 @@ impl JoinCore for Arranged while !todo2.is_empty() && fuel > 0 { todo2.front_mut().unwrap().work( output, - |k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2), + &mut result, &mut fuel ); if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } @@ -648,10 +648,10 @@ where D: Ord+Clone+Data, { phant: ::std::marker::PhantomData, - trace: C1, - trace_storage: C1::Storage, - batch: C2, - batch_storage: C2::Storage, + cursor1: C1, + storage1: C1::Storage, + cursor2: C2, + storage2: C2::Storage, capability: Capability, done: bool, temp: Vec<((D, T), R)>, @@ -670,13 +670,13 @@ where R: Semigroup, D: Clone+Data, { - fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { + fn new(cursor1: C1, storage1: C1::Storage, cursor2: C2, storage2: C2::Storage, capability: Capability) -> Self { Deferred { phant: ::std::marker::PhantomData, - trace, - trace_storage, - batch, - batch_storage, + cursor1, + storage1, + cursor2, + storage2, capability, done: false, temp: Vec::new(), @@ -697,30 +697,30 @@ where let mut effort = 0; let mut session = output.session(&self.capability); - let trace_storage = &self.trace_storage; - let batch_storage = &self.batch_storage; + let storage1 = &self.storage1; + let storage2 = &self.storage2; - let trace = &mut self.trace; - let batch = &mut self.batch; + let cursor1 = &mut self.cursor1; + let cursor2 = &mut self.cursor2; let temp = &mut self.temp; let mut thinker = JoinThinker::new(); - while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel { + while cursor1.key_valid(storage1) && cursor2.key_valid(storage2) && effort < *fuel { - match trace.key(trace_storage).cmp(batch.key(batch_storage)) { - Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)), - Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)), + match cursor1.key(storage1).cmp(cursor2.key(storage2)) { + Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)), + Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)), Ordering::Equal => { - thinker.history1.edits.load(trace, trace_storage, |time| time.join(&meet)); - thinker.history2.edits.load(batch, batch_storage, |time| time.clone()); + thinker.history1.edits.load(cursor1, storage1, |time| time.join(meet)); + thinker.history2.edits.load(cursor2, storage2, |time| time.join(meet)); assert_eq!(temp.len(), 0); // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { - let key = batch.key(batch_storage); + let key = cursor1.key(storage1); for (d, t, r) in logic(key, v1, v2, &t, r1, r2) { temp.push(((d, t), r)); } @@ -738,8 +738,8 @@ where session.give((d, t, r)); } - batch.step_key(batch_storage); - trace.step_key(trace_storage); + cursor1.step_key(storage1); + cursor2.step_key(storage2); thinker.history1.clear(); thinker.history2.clear(); @@ -747,7 +747,7 @@ where } } - self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage); + self.done = !cursor1.key_valid(storage1) || !cursor2.key_valid(storage2); if effort > *fuel { *fuel = 0; } else { *fuel -= effort; }