From f53030f809b194f925473bcb67db69af71075213 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 26 Apr 2023 13:21:27 +0200 Subject: [PATCH] Add a `join_core_yielding` operator This commit adds a `JoinCore::join_core_yielding` operator that allows specifying a `yield_function` to control the join's yield behavior, similarly to how the half join operator is configurable. The `yield_function` enables yielding based on time and number of produced records. The `yield_function` replaces the previous fueling concept used by the join operator. However, higher-level join operators that don't explicitly specify a `yield_function` still have the old behavior of yielding after 1 million produced records, so backwards-compatibility is maintained for all but direct users of `join_core_internal_unsafe`. Some additional care is taken to ensure the `yield_function` is only checked after the join has made some progress. This is to avoid stuck joins caused by overly aggressive `yield_function`s. --- src/operators/join.rs | 199 +++++++++++++++++++++++++++++++++++------- tests/join.rs | 28 +++++- 2 files changed, 196 insertions(+), 31 deletions(-) diff --git a/src/operators/join.rs b/src/operators/join.rs index 8710ac07b..2ce17c4ec 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -5,6 +5,7 @@ //! + (b * c), and if this is not equal to the former term, little is known about the actual output. use std::fmt::Debug; use std::cmp::Ordering; +use std::time::Instant; use timely::order::PartialOrder; use timely::progress::Timestamp; @@ -265,9 +266,68 @@ pub trait JoinCore where G::Time L: FnMut(&K,&V,&Tr2::Val)->I+'static, ; - /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and - /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more - /// flexibility, but is more error-prone. + /// A variant of `join_core` that allows specifying a custom `yield_function` to control when the operator + /// should yield to the runtime. The `yield_function` receives an `Instant` indicating the start of + /// computation and the number of produced records, and should return `true` iff the operator should yield. + /// + /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, + /// which produces something implementing `IntoIterator`, where the output collection will have an entry for + /// every value returned by the iterator. + /// + /// This trait is implemented for arrangements (`Arranged`) rather than collections. The `Join` trait + /// contains the implementations for collections. + /// + /// # Examples + /// + /// ``` + /// extern crate timely; + /// extern crate differential_dataflow; + /// + /// use differential_dataflow::input::Input; + /// use differential_dataflow::operators::arrange::ArrangeByKey; + /// use differential_dataflow::operators::join::JoinCore; + /// use differential_dataflow::trace::Trace; + /// use differential_dataflow::trace::implementations::ord::OrdValSpine; + /// + /// fn main() { + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1 + /// .arrange_by_key(); + /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1 + /// .arrange_by_key(); + /// + /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1; + /// + /// x.join_core_yielding( + /// &y, + /// |_key, &a, &b| Some((a, b)), + /// |timer, _count| timer.elapsed().as_millis() > 10, + /// ).assert_eq(&z); + /// }); + /// } + /// ``` + fn join_core_yielding( + &self, + stream2: &Arranged, + result: L, + yield_function: Y, + ) -> Collection>::Output> + where + Tr2: TraceReader+Clone+'static, + Tr2::Val: Ord+Clone+Debug+'static, + Tr2::R: Semigroup, + R: Multiply, + >::Output: Semigroup, + I: IntoIterator, + I::Item: Data, + L: FnMut(&K,&V,&Tr2::Val)->I+'static, + Y: Fn(Instant,usize)->bool+'static + ; + + /// An unsafe variant of `join_core_yielding` where the `result` closure takes additional arguments for + /// `time` and `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for + /// more flexibility, but is more error-prone. /// /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, /// which produces something implementing `IntoIterator`, where the output collection will have an entry @@ -299,12 +359,20 @@ pub trait JoinCore where G::Time /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1; /// /// // Returned values have weight `a` - /// x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a))) - /// .assert_eq(&z); + /// x.join_core_internal_unsafe( + /// &y, + /// |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)), + /// |_timer, count| count >= 1_000_000, + /// ).assert_eq(&z); /// }); /// } /// ``` - fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection + fn join_core_internal_unsafe( + &self, + stream2: &Arranged, + result: L, + yield_function: Y, + ) -> Collection where Tr2: TraceReader+Clone+'static, Tr2::Val: Ord+Clone+Debug+'static, @@ -313,6 +381,7 @@ pub trait JoinCore where G::Time ROut: Semigroup, I: IntoIterator, L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static, + Y: Fn(Instant,usize)->bool+'static ; } @@ -340,7 +409,33 @@ where .join_core(stream2, result) } - fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection + fn join_core_yielding( + &self, + stream2: &Arranged, + result: L, + yield_function: Y, + ) -> Collection>::Output> + where + Tr2: TraceReader+Clone+'static, + Tr2::Val: Ord+Clone+Debug+'static, + Tr2::R: Semigroup, + R: Multiply, + >::Output: Semigroup, + I: IntoIterator, + I::Item: Data, + L: FnMut(&K,&V,&Tr2::Val)->I+'static, + Y: Fn(Instant,usize)->bool+'static, + { + self.arrange_by_key() + .join_core_yielding(stream2, result, yield_function) + } + + fn join_core_internal_unsafe( + &self, + stream2: &Arranged, + result: L, + yield_function: Y, + ) -> Collection where Tr2: TraceReader+Clone+'static, Tr2::Val: Ord+Clone+Debug+'static, @@ -350,8 +445,10 @@ where ROut: Semigroup, I: IntoIterator, L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static, + Y: Fn(Instant,usize)->bool+'static, { - self.arrange_by_key().join_core_internal_unsafe(stream2, result) + self.arrange_by_key() + .join_core_internal_unsafe(stream2, result, yield_function) } } @@ -365,7 +462,7 @@ impl JoinCore for Arranged T1::Val: Ord+Clone+Debug+'static, T1::R: Semigroup, { - fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> + fn join_core(&self, other: &Arranged, result: L) -> Collection>::Output> where Tr2::Val: Ord+Clone+Debug+'static, Tr2: TraceReader+Clone+'static, @@ -375,16 +472,41 @@ impl JoinCore for Arranged I: IntoIterator, I::Item: Data, L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static + { + self.join_core_yielding(other, result, |_timer, count| count >= 1_000_000) + } + + fn join_core_yielding( + &self, + other: &Arranged, + mut result: L, + yield_function: Y, + ) -> Collection>::Output> + where + Tr2::Val: Ord+Clone+Debug+'static, + Tr2: TraceReader+Clone+'static, + Tr2::R: Semigroup, + T1::R: Multiply, + >::Output: Semigroup, + I: IntoIterator, + I::Item: Data, + L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static, + Y: Fn(Instant,usize)->bool+'static, { let result = move |k: &T1::Key, v1: &T1::Val, v2: &Tr2::Val, t: &G::Timestamp, r1: &T1::R, r2: &Tr2::R| { let t = t.clone(); let r = (r1.clone()).multiply(r2); result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) }; - self.join_core_internal_unsafe(other, result) + self.join_core_internal_unsafe(other, result, yield_function) } - fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection + fn join_core_internal_unsafe( + &self, + other: &Arranged, + mut result: L, + yield_function: Y, + ) -> Collection where Tr2: TraceReader+Clone+'static, Tr2::Val: Ord+Clone+Debug+'static, @@ -393,6 +515,7 @@ impl JoinCore for Arranged ROut: Semigroup, I: IntoIterator, L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::R,&Tr2::R)->I+'static, + Y: Fn(Instant,usize)->bool+'static, { // Rename traces for symmetry from here on out. let mut trace1 = self.trace.clone(); @@ -560,25 +683,33 @@ impl JoinCore for Arranged // input must scan all batches from the other input). // Perform some amount of outstanding work. - let mut fuel = 1_000_000; - while !todo1.is_empty() && fuel > 0 { - todo1.front_mut().unwrap().work( + let timer = Instant::now(); + let yield_fn = |count| yield_function(timer, count); + let mut effort = 0; + while let Some(mut work) = todo1.pop_front() { + work.work( output, |k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2), - &mut fuel + &mut effort, + yield_fn, ); - if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } + if work.work_remains() { todo1.push_front(work) } + if yield_fn(effort) { break } } // Perform some amount of outstanding work. - let mut fuel = 1_000_000; - while !todo2.is_empty() && fuel > 0 { - todo2.front_mut().unwrap().work( + let timer = Instant::now(); + let yield_fn = |count| yield_function(timer, count); + let mut effort = 0; + while let Some(mut work) = todo2.pop_front() { + work.work( output, |k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2), - &mut fuel + &mut effort, + yield_fn, ); - if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } + if work.work_remains() { todo2.push_front(work) } + if yield_fn(effort) { break } } // Re-activate operator if work remains. @@ -687,14 +818,23 @@ where !self.done } - /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. + /// Process keys until `yield_fn` returns `true`, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) - where I: IntoIterator, L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I { + fn work( + &mut self, + output: &mut OutputHandle>, + mut logic: L, + effort: &mut usize, + yield_fn: Y, + ) + where + I: IntoIterator, + L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I, + Y: Fn(usize)->bool, + { let meet = self.capability.time(); - let mut effort = 0; let mut session = output.session(&self.capability); let trace_storage = &self.trace_storage; @@ -706,7 +846,7 @@ where let temp = &mut self.temp; let mut thinker = JoinThinker::new(); - while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel { + while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) { match trace.key(trace_storage).cmp(batch.key(batch_storage)) { Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)), @@ -733,7 +873,7 @@ where // consolidation, and then deposit results in `session`. crate::consolidation::consolidate(temp); - effort += temp.len(); + *effort += temp.len(); for ((d, t), r) in temp.drain(..) { session.give((d, t, r)); } @@ -745,12 +885,11 @@ where thinker.history2.clear(); } } + + if yield_fn(*effort) { break } } self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage); - - if effort > *fuel { *fuel = 0; } - else { *fuel -= effort; } } } diff --git a/tests/join.rs b/tests/join.rs index 2c956a4b1..d0ccb223f 100644 --- a/tests/join.rs +++ b/tests/join.rs @@ -101,4 +101,30 @@ fn join_scaling(scale: u64) { let extracted = data.extract(); assert_eq!(extracted.len(), 0); -} \ No newline at end of file +} + +/// Ensure that a join completes even with the most aggressive yield configuration. +#[test] +fn join_core_yielding_aggressive() { + let time = Default::default(); + let data = timely::example(move |scope| { + let arr1 = [((0, 0), time, 1), ((1, 2), time, 1)] + .to_stream(scope) + .as_collection() + .arrange_by_key(); + let arr2 = [((0, 'a'), time, 1), ((1, 'B'), time, 1)] + .to_stream(scope) + .as_collection() + .arrange_by_key(); + + arr1.join_core_yielding( + &arr2, + |k, a, b| Some((*k, *a, *b)), + |_timer, _count| true, + ).inner.capture() + }); + + let extracted = data.extract(); + assert_eq!(extracted.len(), 1); + assert_eq!(extracted[0].1, vec![((0, 0, 'a'), time, 1), ((1, 2, 'B'), time, 1)]); +}