Skip to content

Commit adf4ac7

Browse files
Add join_function opeerator (#323)
1 parent 5c3bfb8 commit adf4ac7

File tree

1 file changed

+40
-0
lines changed

1 file changed

+40
-0
lines changed

src/collection.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,46 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
270270
.as_collection()
271271
}
272272

273+
/// Joins each record against a collection defined by the function `logic`.
274+
///
275+
/// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
276+
/// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
277+
/// modifications made to the results, namely joining timestamps and multiplying differences.
278+
///
279+
/// #Examples
280+
///
281+
/// ```
282+
/// extern crate timely;
283+
/// extern crate differential_dataflow;
284+
///
285+
/// use differential_dataflow::input::Input;
286+
///
287+
/// fn main() {
288+
/// ::timely::example(|scope| {
289+
/// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
290+
/// // for x from 0 through 9.
291+
/// scope.new_collection_from(0 .. 10isize).1
292+
/// .join_function(|x|
293+
/// // data time diff
294+
/// vec![(2*x, (3*x) as u64, x),
295+
/// (2*x, (4*x) as u64, -x)]
296+
/// );
297+
/// });
298+
/// }
299+
/// ```
300+
pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
301+
where G::Timestamp: Lattice,
302+
D2: Data,
303+
R2: Semigroup+Multiply<R>,
304+
<R2 as Multiply<R>>::Output: Data+Semigroup,
305+
I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
306+
L: FnMut(D)->I+'static,
307+
{
308+
self.inner
309+
.flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
310+
.as_collection()
311+
}
312+
273313
/// Brings a Collection into a nested scope.
274314
///
275315
/// # Examples

0 commit comments

Comments
 (0)