Skip to content

Commit 870de8c

Browse files
committed
compute: simplify mz_join_core result computation
Previously, the mz_join_core code wrapped the `result` closure into another `work_result` closure, to deal with the fact that matches from the `todo1` list had their value and diff fields swapped. Instead of having a closure to unswap the fields, we can simply pass them to the `Deferred` constructor in the correct order instead. That is, the `Deferred` constructor should always receive the cursor/storage from input 1 first and from input 2 second. If we make this change, no `work_result` closure is necessary and the code become easier to reason about.
1 parent ee878f6 commit 870de8c

File tree

1 file changed

+45
-55
lines changed

1 file changed

+45
-55
lines changed

src/compute/src/render/join/mz_join_core.rs

Lines changed: 45 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,10 @@ where
198198
trace2.cursor_through(acknowledged2.borrow()).unwrap();
199199
let batch1_cursor = batch1.cursor();
200200
todo1.push_back(Deferred::new(
201-
trace2_cursor,
202-
trace2_storage,
203201
batch1_cursor,
204202
batch1.clone(),
203+
trace2_cursor,
204+
trace2_storage,
205205
capability.clone(),
206206
));
207207
}
@@ -273,27 +273,13 @@ where
273273
// which results in unintentionally quadratic processing time (each batch of either
274274
// input must scan all batches from the other input).
275275

276-
let mut work_result = |k: &Tr1::Key,
277-
v1: &Tr1::Val,
278-
v2: &Tr2::Val,
279-
t: &G::Timestamp,
280-
r1: &Tr1::R,
281-
r2: &Tr2::R| {
282-
let t = t.clone();
283-
let r = r1.clone().multiply(r2);
284-
result(k, v1, v2)
285-
.into_iter()
286-
.map(move |d| (d, t.clone(), r.clone()))
287-
};
288-
289276
// Perform some amount of outstanding work.
290277
let mut fuel = 1_000_000;
291278
while !todo1.is_empty() && fuel > 0 {
292-
todo1.front_mut().unwrap().work(
293-
output,
294-
|k, v2, v1, t, r2, r1| work_result(k, v1, v2, t, r1, r2),
295-
&mut fuel,
296-
);
279+
todo1
280+
.front_mut()
281+
.unwrap()
282+
.work(output, &mut result, &mut fuel);
297283
if !todo1.front().unwrap().work_remains() {
298284
todo1.pop_front();
299285
}
@@ -305,7 +291,7 @@ where
305291
todo2
306292
.front_mut()
307293
.unwrap()
308-
.work(output, &mut work_result, &mut fuel);
294+
.work(output, &mut result, &mut fuel);
309295
if !todo2.front().unwrap().work_remains() {
310296
todo2.pop_front();
311297
}
@@ -373,10 +359,10 @@ where
373359
C1: Cursor<Key = Row, Val = Row, Time = T, R = Diff>,
374360
C2: Cursor<Key = Row, Val = Row, Time = T, R = Diff>,
375361
{
376-
trace: C1,
377-
trace_storage: C1::Storage,
378-
batch: C2,
379-
batch_storage: C2::Storage,
362+
cursor1: C1,
363+
storage1: C1::Storage,
364+
cursor2: C2,
365+
storage2: C2::Storage,
380366
capability: Capability<T>,
381367
done: bool,
382368
temp: Vec<(D, T, Diff)>,
@@ -390,17 +376,17 @@ where
390376
D: Data,
391377
{
392378
fn new(
393-
trace: C1,
394-
trace_storage: C1::Storage,
395-
batch: C2,
396-
batch_storage: C2::Storage,
379+
cursor1: C1,
380+
storage1: C1::Storage,
381+
cursor2: C2,
382+
storage2: C2::Storage,
397383
capability: Capability<T>,
398384
) -> Self {
399385
Deferred {
400-
trace,
401-
trace_storage,
402-
batch,
403-
batch_storage,
386+
cursor1,
387+
storage1,
388+
cursor2,
389+
storage2,
404390
capability,
405391
done: false,
406392
temp: Vec::new(),
@@ -415,46 +401,50 @@ where
415401
fn work<L, I>(
416402
&mut self,
417403
output: &mut OutputHandle<T, (D, T, Diff), Tee<T, (D, T, Diff)>>,
418-
mut logic: L,
404+
mut result: L,
419405
fuel: &mut usize,
420406
) where
421-
I: IntoIterator<Item = (D, T, Diff)>,
422-
L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::R, &C2::R) -> I,
407+
I: IntoIterator<Item = D>,
408+
L: FnMut(&C1::Key, &C1::Val, &C2::Val) -> I,
423409
{
424410
let meet = self.capability.time();
425411

426412
let mut session = output.session(&self.capability);
427413

428-
let trace_storage = &self.trace_storage;
429-
let batch_storage = &self.batch_storage;
414+
let storage1 = &self.storage1;
415+
let storage2 = &self.storage2;
430416

431-
let trace = &mut self.trace;
432-
let batch = &mut self.batch;
417+
let cursor1 = &mut self.cursor1;
418+
let cursor2 = &mut self.cursor2;
433419

434420
let temp = &mut self.temp;
435421

436-
while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) {
437-
match trace.key(trace_storage).cmp(batch.key(batch_storage)) {
438-
Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)),
439-
Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
422+
while cursor2.key_valid(storage2) && cursor1.key_valid(storage1) {
423+
match cursor1.key(storage1).cmp(cursor2.key(storage2)) {
424+
Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
425+
Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
440426
Ordering::Equal => {
441427
assert_eq!(temp.len(), 0);
442428

443429
// Populate `temp` with the results, as long as fuel remains.
444-
let key = batch.key(batch_storage);
445-
while let Some(val1) = trace.get_val(trace_storage) {
446-
while let Some(val2) = batch.get_val(batch_storage) {
447-
trace.map_times(trace_storage, |time1, diff1| {
430+
let key = cursor2.key(storage2);
431+
while let Some(val1) = cursor1.get_val(storage1) {
432+
while let Some(val2) = cursor2.get_val(storage2) {
433+
cursor1.map_times(storage1, |time1, diff1| {
448434
let time1 = time1.join(meet);
449-
batch.map_times(batch_storage, |time2, diff2| {
435+
cursor2.map_times(storage2, |time2, diff2| {
450436
let time = time1.join(time2);
451-
temp.extend(logic(key, val1, val2, &time, diff1, diff2))
437+
let diff = diff1.multiply(diff2);
438+
let results = result(key, val1, val2)
439+
.into_iter()
440+
.map(|d| (d, time.clone(), diff.clone()));
441+
temp.extend(results);
452442
});
453443
});
454-
batch.step_val(batch_storage);
444+
cursor2.step_val(storage2);
455445
}
456-
batch.rewind_vals(batch_storage);
457-
trace.step_val(trace_storage);
446+
cursor2.rewind_vals(storage2);
447+
cursor1.step_val(storage1);
458448

459449
// TODO: This consolidation is optional, and it may not be very
460450
// helpful. We might try harder to understand whether we
@@ -472,8 +462,8 @@ where
472462
}
473463
}
474464

475-
batch.step_key(batch_storage);
476-
trace.step_key(trace_storage);
465+
cursor2.step_key(storage2);
466+
cursor1.step_key(storage1);
477467
}
478468
}
479469
}

0 commit comments

Comments
 (0)