Skip to content

Add a join_core_yielding operator #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 169 additions & 30 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
use std::fmt::Debug;
use std::cmp::Ordering;
use std::time::Instant;

use timely::order::PartialOrder;
use timely::progress::Timestamp;
Expand Down Expand Up @@ -265,9 +266,68 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
L: FnMut(&K,&V,&Tr2::Val)->I+'static,
;

/// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and
/// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more
/// flexibility, but is more error-prone.
/// A variant of `join_core` that allows specifying a custom `yield_function` to control when the operator
/// should yield to the runtime. The `yield_function` receives an `Instant` indicating the start of
/// computation and the number of produced records, and should return `true` iff the operator should yield.
///
/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
/// every value returned by the iterator.
///
/// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
/// contains the implementations for collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::arrange::ArrangeByKey;
/// use differential_dataflow::operators::join::JoinCore;
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::implementations::ord::OrdValSpine;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
/// .arrange_by_key();
/// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
/// .arrange_by_key();
///
/// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
///
/// x.join_core_yielding(
/// &y,
/// |_key, &a, &b| Some((a, b)),
/// |timer, _count| timer.elapsed().as_millis() > 10,
/// ).assert_eq(&z);
/// });
/// }
/// ```
fn join_core_yielding<Tr2,I,L,Y>(
&self,
stream2: &Arranged<G,Tr2>,
result: L,
yield_function: Y,
) -> Collection<G,I::Item,<R as Multiply<Tr2::R>>::Output>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
R: Multiply<Tr2::R>,
<R as Multiply<Tr2::R>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&K,&V,&Tr2::Val)->I+'static,
Y: Fn(Instant,usize)->bool+'static
;

/// An unsafe variant of `join_core_yielding` where the `result` closure takes additional arguments for
/// `time` and `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for
/// more flexibility, but is more error-prone.
///
/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
/// which produces something implementing `IntoIterator`, where the output collection will have an entry
Expand Down Expand Up @@ -299,12 +359,20 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
/// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
///
/// // Returned values have weight `a`
/// x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
/// .assert_eq(&z);
/// x.join_core_internal_unsafe(
/// &y,
/// |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)),
/// |_timer, count| count >= 1_000_000,
/// ).assert_eq(&z);
/// });
/// }
/// ```
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
fn join_core_internal_unsafe<Tr2,I,L,D,ROut,Y>(
&self,
stream2: &Arranged<G,Tr2>,
result: L,
yield_function: Y,
) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Expand All @@ -313,6 +381,7 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static,
Y: Fn(Instant,usize)->bool+'static
;
}

Expand Down Expand Up @@ -340,7 +409,33 @@ where
.join_core(stream2, result)
}

fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
fn join_core_yielding<Tr2,I,L,Y>(
&self,
stream2: &Arranged<G,Tr2>,
result: L,
yield_function: Y,
) -> Collection<G,I::Item,<R as Multiply<Tr2::R>>::Output>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
R: Multiply<Tr2::R>,
<R as Multiply<Tr2::R>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&K,&V,&Tr2::Val)->I+'static,
Y: Fn(Instant,usize)->bool+'static,
{
self.arrange_by_key()
.join_core_yielding(stream2, result, yield_function)
}

fn join_core_internal_unsafe<Tr2,I,L,D,ROut,Y>(
&self,
stream2: &Arranged<G,Tr2>,
result: L,
yield_function: Y,
) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Expand All @@ -350,8 +445,10 @@ where
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static,
Y: Fn(Instant,usize)->bool+'static,
{
self.arrange_by_key().join_core_internal_unsafe(stream2, result)
self.arrange_by_key()
.join_core_internal_unsafe(stream2, result, yield_function)
}

}
Expand All @@ -365,7 +462,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
T1::Val: Ord+Clone+Debug+'static,
T1::R: Semigroup,
{
fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<T1::R as Multiply<Tr2::R>>::Output>
fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<T1::R as Multiply<Tr2::R>>::Output>
where
Tr2::Val: Ord+Clone+Debug+'static,
Tr2: TraceReader<Key=T1::Key,Time=G::Timestamp>+Clone+'static,
Expand All @@ -375,16 +472,41 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
I: IntoIterator,
I::Item: Data,
L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static
{
self.join_core_yielding(other, result, |_timer, count| count >= 1_000_000)
}

fn join_core_yielding<Tr2,I,L,Y>(
&self,
other: &Arranged<G,Tr2>,
mut result: L,
yield_function: Y,
) -> Collection<G,I::Item,<T1::R as Multiply<Tr2::R>>::Output>
where
Tr2::Val: Ord+Clone+Debug+'static,
Tr2: TraceReader<Key=T1::Key,Time=G::Timestamp>+Clone+'static,
Tr2::R: Semigroup,
T1::R: Multiply<Tr2::R>,
<T1::R as Multiply<Tr2::R>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static,
Y: Fn(Instant,usize)->bool+'static,
{
let result = move |k: &T1::Key, v1: &T1::Val, v2: &Tr2::Val, t: &G::Timestamp, r1: &T1::R, r2: &Tr2::R| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
self.join_core_internal_unsafe(other, result)
self.join_core_internal_unsafe(other, result, yield_function)
}

fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,D,ROut>
fn join_core_internal_unsafe<Tr2,I,L,D,ROut,Y>(
&self,
other: &Arranged<G,Tr2>,
mut result: L,
yield_function: Y,
) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=T1::Key, Time=G::Timestamp>+Clone+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Expand All @@ -393,6 +515,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::R,&Tr2::R)->I+'static,
Y: Fn(Instant,usize)->bool+'static,
{
// Rename traces for symmetry from here on out.
let mut trace1 = self.trace.clone();
Expand Down Expand Up @@ -560,25 +683,33 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
// input must scan all batches from the other input).

// Perform some amount of outstanding work.
let mut fuel = 1_000_000;
while !todo1.is_empty() && fuel > 0 {
todo1.front_mut().unwrap().work(
let timer = Instant::now();
let yield_fn = |count| yield_function(timer, count);
let mut effort = 0;
while let Some(mut work) = todo1.pop_front() {
work.work(
output,
|k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2),
&mut fuel
&mut effort,
yield_fn,
);
if !todo1.front().unwrap().work_remains() { todo1.pop_front(); }
if work.work_remains() { todo1.push_front(work) }
if yield_fn(effort) { break }
}

// Perform some amount of outstanding work.
let mut fuel = 1_000_000;
while !todo2.is_empty() && fuel > 0 {
todo2.front_mut().unwrap().work(
let timer = Instant::now();
let yield_fn = |count| yield_function(timer, count);
let mut effort = 0;
while let Some(mut work) = todo2.pop_front() {
work.work(
output,
|k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2),
&mut fuel
&mut effort,
yield_fn,
);
if !todo2.front().unwrap().work_remains() { todo2.pop_front(); }
if work.work_remains() { todo2.push_front(work) }
if yield_fn(effort) { break }
}

// Re-activate operator if work remains.
Expand Down Expand Up @@ -687,14 +818,23 @@ where
!self.done
}

/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
/// Process keys until `yield_fn` returns `true`, or the work is exhausted.
#[inline(never)]
fn work<L, I>(&mut self, output: &mut OutputHandle<T, (D, T, R), Tee<T, (D, T, R)>>, mut logic: L, fuel: &mut usize)
where I: IntoIterator<Item=(D, T, R)>, L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I {
fn work<L, I, Y>(
&mut self,
output: &mut OutputHandle<T, (D, T, R), Tee<T, (D, T, R)>>,
mut logic: L,
effort: &mut usize,
yield_fn: Y,
)
where
I: IntoIterator<Item=(D, T, R)>,
L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I,
Y: Fn(usize)->bool,
{

let meet = self.capability.time();

let mut effort = 0;
let mut session = output.session(&self.capability);

let trace_storage = &self.trace_storage;
Expand All @@ -706,7 +846,7 @@ where
let temp = &mut self.temp;
let mut thinker = JoinThinker::new();

while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel {
while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) {

match trace.key(trace_storage).cmp(batch.key(batch_storage)) {
Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)),
Expand All @@ -733,7 +873,7 @@ where
// consolidation, and then deposit results in `session`.
crate::consolidation::consolidate(temp);

effort += temp.len();
*effort += temp.len();
for ((d, t), r) in temp.drain(..) {
session.give((d, t, r));
}
Expand All @@ -745,12 +885,11 @@ where
thinker.history2.clear();
}
}

if yield_fn(*effort) { break }
}

self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);

if effort > *fuel { *fuel = 0; }
else { *fuel -= effort; }
}
}

Expand Down
28 changes: 27 additions & 1 deletion tests/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,30 @@ fn join_scaling(scale: u64) {

let extracted = data.extract();
assert_eq!(extracted.len(), 0);
}
}

/// Ensure that a join completes even with the most aggressive yield configuration.
#[test]
fn join_core_yielding_aggressive() {
let time = Default::default();
let data = timely::example(move |scope| {
let arr1 = [((0, 0), time, 1), ((1, 2), time, 1)]
.to_stream(scope)
.as_collection()
.arrange_by_key();
let arr2 = [((0, 'a'), time, 1), ((1, 'B'), time, 1)]
.to_stream(scope)
.as_collection()
.arrange_by_key();

arr1.join_core_yielding(
&arr2,
|k, a, b| Some((*k, *a, *b)),
|_timer, _count| true,
).inner.capture()
});

let extracted = data.extract();
assert_eq!(extracted.len(), 1);
assert_eq!(extracted[0].1, vec![((0, 0, 'a'), time, 1), ((1, 2, 'B'), time, 1)]);
}