Skip to content

Commit b6709dc

Browse files
committed
mz_join_v2: process work by polling futures
This commit changes how `mz_join_core_v2::Work` keeps pending work. Instead of storing `Deferred` objects, it stores `Future`s, which is polls to make progress on the pending work. The `work` method becomes and async method, allowing us to yield at arbitrary points, without needing to worry about self references held across yield points.
1 parent 0930887 commit b6709dc

File tree

2 files changed

+92
-81
lines changed

2 files changed

+92
-81
lines changed

src/compute/src/render/join/linear_join.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ impl LinearJoinSpec {
113113
+ Clone
114114
+ 'static,
115115
L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
116-
I: IntoIterator,
117-
I::Item: Data,
116+
I: IntoIterator<Item: Data> + 'static,
118117
{
119118
use LinearJoinImpl::*;
120119

src/compute/src/render/join/mz_join_core_v2.rs

Lines changed: 91 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
//! Eventually, we hope that `mz_join_core_v2` proves itself sufficiently to become the only join
2929
//! implementation.
3030
31+
use std::cell::Cell;
3132
use std::cell::RefCell;
3233
use std::cmp::Ordering;
3334
use std::collections::VecDeque;
35+
use std::marker::PhantomData;
36+
use std::pin::Pin;
3437
use std::rc::Rc;
3538
use std::time::Instant;
3639

@@ -41,12 +44,12 @@ use differential_dataflow::difference::Multiply;
4144
use differential_dataflow::lattice::Lattice;
4245
use differential_dataflow::operators::arrange::arrangement::Arranged;
4346
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
47+
use mz_ore::future::yield_now;
4448
use mz_repr::Diff;
4549
use timely::PartialOrder;
4650
use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
4751
use timely::dataflow::channels::pact::Pipeline;
4852
use timely::dataflow::channels::pushers::Tee;
49-
use timely::dataflow::channels::pushers::buffer::Session;
5053
use timely::dataflow::operators::generic::OutputHandleCore;
5154
use timely::dataflow::operators::{Capability, Operator};
5255
use timely::dataflow::{Scope, StreamCore};
@@ -75,8 +78,7 @@ where
7578
+ Clone
7679
+ 'static,
7780
L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
78-
I: IntoIterator,
79-
I::Item: Data,
81+
I: IntoIterator<Item: Data> + 'static,
8082
YFn: Fn(Instant, usize) -> bool + 'static,
8183
C: SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
8284
{
@@ -497,23 +499,36 @@ where
497499
C2: Cursor,
498500
{
499501
/// Pending work.
500-
todo: VecDeque<Deferred<C1, C2, D>>,
502+
todo: VecDeque<(Pin<Box<dyn Future<Output = ()>>>, Capability<C1::Time>)>,
501503
/// A function that transforms raw join matches into join results.
502504
result_fn: Rc<RefCell<L>>,
505+
/// A buffer holding the join results.
506+
///
507+
/// Written by the work futures, drained by `Work::process`.
508+
output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
509+
/// The numer of join results produced by work futures.
510+
///
511+
/// Used with `yield_fn` to inform when `Work::process` should yield.
512+
produced: Rc<Cell<usize>>,
513+
514+
_cursors: PhantomData<(C1, C2)>,
503515
}
504516

505517
impl<C1, C2, D, L, I> Work<C1, C2, D, L>
506518
where
507-
C1: Cursor<Diff = Diff>,
508-
C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
519+
C1: Cursor<Diff = Diff> + 'static,
520+
C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff> + 'static,
509521
D: Data,
510-
L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I,
511-
I: IntoIterator<Item = D>,
522+
L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
523+
I: IntoIterator<Item = D> + 'static,
512524
{
513525
fn new(result_fn: Rc<RefCell<L>>) -> Self {
514526
Self {
515527
todo: Default::default(),
516528
result_fn,
529+
output: Default::default(),
530+
produced: Default::default(),
531+
_cursors: PhantomData,
517532
}
518533
}
519534

@@ -536,15 +551,20 @@ where
536551
storage2: C2::Storage,
537552
capability: Capability<C1::Time>,
538553
) {
539-
self.todo.push_back(Deferred {
554+
let deferred = Deferred {
540555
cursor1,
541556
storage1,
542557
cursor2,
543558
storage2,
544-
capability,
545-
done: false,
546-
temp: Default::default(),
547-
});
559+
capability: capability.clone(),
560+
};
561+
let fut = deferred.work(
562+
Rc::clone(&self.result_fn),
563+
Rc::clone(&self.output),
564+
Rc::clone(&self.produced),
565+
);
566+
567+
self.todo.push_back((Box::pin(fut), capability));
548568
}
549569

550570
/// Discard all pending work.
@@ -562,20 +582,43 @@ where
562582
YFn: Fn(Instant, usize) -> bool,
563583
{
564584
let start_time = Instant::now();
565-
let mut produced = 0;
566-
567-
while !yield_fn(start_time, produced)
568-
&& let Some(mut deferred) = self.todo.pop_front()
569-
{
570-
deferred.work(
571-
output,
572-
&mut *self.result_fn.borrow_mut(),
573-
|w| yield_fn(start_time, w),
574-
&mut produced,
575-
);
585+
self.produced.set(0);
586+
587+
let waker = futures::task::noop_waker();
588+
let mut ctx = std::task::Context::from_waker(&waker);
589+
590+
while let Some((mut fut, cap)) = self.todo.pop_front() {
591+
// Drive the work future until it's done or it's time to yield.
592+
let mut done = false;
593+
let mut should_yield = false;
594+
while !done && !should_yield {
595+
done = fut.as_mut().poll(&mut ctx).is_ready();
596+
should_yield = yield_fn(start_time, self.produced.get());
597+
}
598+
599+
// Drain the produced join results.
600+
let mut output_buf = self.output.borrow_mut();
601+
602+
// Consolidating here is important when the join closure produces data that
603+
// consolidates well, for example when projecting columns.
604+
let old_len = output_buf.len();
605+
consolidate_updates(&mut output_buf);
606+
let recovered = old_len - output_buf.len();
607+
self.produced.update(|x| x - recovered);
608+
609+
output.session(&cap).give_iterator(output_buf.drain(..));
610+
611+
if done {
612+
// We have finished processing a chunk of work. Use this opportunity to truncate
613+
// the output buffer, so we don't keep excess memory allocated forever.
614+
*output_buf = Default::default();
615+
} else if !done {
616+
// Still work to do in this chunk.
617+
self.todo.push_front((fut, cap));
618+
}
576619

577-
if !deferred.done {
578-
self.todo.push_front(deferred);
620+
if should_yield {
621+
break;
579622
}
580623
}
581624
}
@@ -586,7 +629,7 @@ where
586629
/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
587630
/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
588631
/// dataflow system a chance to run operators that can consume and aggregate the data.
589-
struct Deferred<C1, C2, D>
632+
struct Deferred<C1, C2>
590633
where
591634
C1: Cursor,
592635
C2: Cursor,
@@ -596,63 +639,45 @@ where
596639
cursor2: C2,
597640
storage2: C2::Storage,
598641
capability: Capability<C1::Time>,
599-
done: bool,
600-
temp: Vec<(D, C1::Time, Diff)>,
601642
}
602643

603-
impl<C1, C2, D> Deferred<C1, C2, D>
644+
impl<C1, C2> Deferred<C1, C2>
604645
where
605646
C1: Cursor<Diff = Diff>,
606647
C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
607-
D: Data,
608648
{
609649
/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
610-
fn work<L, I, YFn, C>(
611-
&mut self,
612-
output: &mut OutputHandleCore<C1::Time, CapacityContainerBuilder<C>, Tee<C1::Time, C>>,
613-
mut logic: L,
614-
yield_fn: YFn,
615-
produced: &mut usize,
650+
async fn work<L, I, D>(
651+
mut self,
652+
logic: Rc<RefCell<L>>,
653+
output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
654+
produced: Rc<Cell<usize>>,
616655
) where
617656
I: IntoIterator<Item = D>,
618657
L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I,
619-
YFn: Fn(usize) -> bool,
620-
C: SizableContainer + PushInto<(D, C1::Time, Diff)> + Data,
658+
D: Data,
621659
{
622660
let meet = self.capability.time();
623661

624-
let mut session = output.session(&self.capability);
625-
626662
let storage1 = &self.storage1;
627663
let storage2 = &self.storage2;
628664

629665
let cursor1 = &mut self.cursor1;
630666
let cursor2 = &mut self.cursor2;
631667

632-
let temp = &mut self.temp;
633-
634-
let flush = |data: &mut Vec<_>, session: &mut Session<_, _, _>| {
635-
let old_len = data.len();
636-
// Consolidating here is important when the join closure produces data that
637-
// consolidates well, for example when projecting columns.
638-
consolidate_updates(data);
639-
let recovered = old_len - data.len();
640-
session.give_iterator(data.drain(..));
641-
recovered
642-
};
643-
644-
assert_eq!(temp.len(), 0);
645-
646668
let mut buffer = Vec::default();
647669

648670
while cursor1.key_valid(storage1) && cursor2.key_valid(storage2) {
649671
match cursor1.key(storage1).cmp(&cursor2.key(storage2)) {
650672
Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
651673
Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
652674
Ordering::Equal => {
653-
// Populate `temp` with the results, until we should yield.
675+
// Populate `output` with the results, until we should yield.
654676
let key = cursor2.key(storage2);
655677
while let Some(val1) = cursor1.get_val(storage1) {
678+
let mut logic = logic.borrow_mut();
679+
let mut output = output.borrow_mut();
680+
656681
while let Some(val2) = cursor2.get_val(storage2) {
657682
// Evaluate logic on `key, val1, val2`. Note the absence of time and diff.
658683
let mut result = logic(key, val1, val2).into_iter().peekable();
@@ -673,55 +698,42 @@ where
673698
});
674699
consolidate(&mut buffer);
675700

701+
produced.update(|x| x + buffer.len());
702+
676703
// Special case no results, one result, and potentially many results
677704
match (result.peek().is_some(), buffer.len()) {
678705
// Certainly no output
679706
(_, 0) => {}
680707
// Single element, single time
681708
(false, 1) => {
682709
let (time, diff) = buffer.pop().unwrap();
683-
temp.push((first, time, diff));
710+
output.push((first, time, diff));
684711
}
685712
// Multiple elements or multiple times
686713
(_, _) => {
687714
for d in std::iter::once(first).chain(result) {
688-
temp.extend(buffer.iter().map(|(time, diff)| {
689-
(d.clone(), time.clone(), diff.clone())
690-
}))
715+
let updates = buffer
716+
.drain(..)
717+
.map(|(time, diff)| (d.clone(), time, diff));
718+
output.extend(updates);
691719
}
692720
}
693721
}
694-
buffer.clear();
695722
}
696723
cursor2.step_val(storage2);
697724
}
698725
cursor1.step_val(storage1);
699726
cursor2.rewind_vals(storage2);
700727

701-
*produced = produced.saturating_add(temp.len());
702-
703-
if yield_fn(*produced) {
704-
// Returning here is only allowed because we leave the cursors in a
705-
// state that will let us pick up the work correctly on the next
706-
// invocation.
707-
*produced -= flush(temp, &mut session);
708-
if yield_fn(*produced) {
709-
return;
710-
}
711-
}
728+
// Drop all shared state before yielding.
729+
drop((logic, output));
730+
yield_now().await;
712731
}
713732

714733
cursor1.step_key(storage1);
715734
cursor2.step_key(storage2);
716735
}
717736
}
718737
}
719-
720-
if !temp.is_empty() {
721-
*produced -= flush(temp, &mut session);
722-
}
723-
724-
// We only get here after having iterated through all keys.
725-
self.done = true;
726738
}
727739
}

0 commit comments

Comments
 (0)