Skip to content

Commit e461000

Browse files
committed
Added arranged methods for threshold, distinct and count
1 parent 2aa27dd commit e461000

File tree

1 file changed

+125
-17
lines changed

1 file changed

+125
-17
lines changed

src/operators/reduce.rs

Lines changed: 125 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,28 @@ pub trait Threshold<G: Scope, K: Data, R1: Semigroup> where G::Timestamp: Lattic
131131
}
132132

133133
/// A `threshold` with the ability to name the operator.
134-
fn threshold_named<R2: Abelian, F: FnMut(&K, &R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2>;
134+
fn threshold_named<R2: Abelian, F: FnMut(&K, &R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2> {
135+
self.threshold_core::<R2, _, DefaultKeyTrace<_, _, _>>(name, thresh)
136+
.as_collection(|key: &K, &()| key.clone())
137+
}
138+
139+
/// A `threshold` that returns the manifested arrangement
140+
fn threshold_arranged<R2, F>(&self, threshold: F) -> Arranged<G, TraceAgent<DefaultKeyTrace<K, G::Timestamp, R2>>>
141+
where
142+
R2: Abelian,
143+
F: FnMut(&K, &R1) -> R2 + 'static,
144+
{
145+
self.threshold_core("Threshold", threshold)
146+
}
147+
148+
/// The inner logic behind `threshold`, allows naming the operator and returns an arrangement
149+
fn threshold_core<R2, F, Tr>(&self, name: &str, threshold: F) -> Arranged<G, TraceAgent<Tr>>
150+
where
151+
R2: Abelian,
152+
F: FnMut(&K, &R1) -> R2 + 'static,
153+
Tr: Trace + TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R2> + 'static,
154+
Tr::Batch: Batch<K, (), G::Timestamp, R2>,
155+
Tr::Cursor: Cursor<K, (), G::Timestamp, R2>;
135156

136157
/// Reduces the collection to one occurrence of each distinct element.
137158
///
@@ -163,28 +184,74 @@ pub trait Threshold<G: Scope, K: Data, R1: Semigroup> where G::Timestamp: Lattic
163184
/// type is something other than an `isize` integer, for example perhaps an
164185
/// `i32`.
165186
fn distinct_core<R2: Abelian+From<i8>>(&self) -> Collection<G, K, R2> {
166-
self.threshold_named("Distinct", |_,_| R2::from(1i8))
187+
self.distinct_arranged()
188+
.as_collection(|key, &()| key.clone())
189+
}
190+
191+
/// Distinct for general integer differences that returns an [arrangement](Arranged)
192+
///
193+
/// This method allows `distinct` to produce collections whose difference
194+
/// type is something other than an `isize` integer, for example perhaps an
195+
/// `i32`.
196+
fn distinct_arranged<R2>(&self) -> Arranged<G, TraceAgent<DefaultKeyTrace<K, G::Timestamp, R2>>>
197+
where
198+
R2: Abelian + From<i8>,
199+
{
200+
self.distinct_arranged_core("Distinct")
201+
}
202+
203+
/// Distinct for general integer differences that returns an [arrangement](Arranged)
204+
///
205+
/// This method allows `distinct` to produce collections whose difference
206+
/// type is something other than an `isize` integer, for example perhaps an
207+
/// `i32`.
208+
fn distinct_arranged_core<R2, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
209+
where
210+
R2: Abelian + From<i8>,
211+
Tr: Trace + TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R2> + 'static,
212+
Tr::Batch: Batch<K, (), G::Timestamp, R2>,
213+
Tr::Cursor: Cursor<K, (), G::Timestamp, R2>,
214+
{
215+
self.threshold_core(name, |_, _| R2::from(1i8))
167216
}
168217
}
169218

170-
impl<G: Scope, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold<G, K, R1> for Collection<G, K, R1>
171-
where G::Timestamp: Lattice+Ord {
172-
fn threshold_named<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2> {
219+
impl<G: Scope, K: ExchangeData + Hashable, R1: ExchangeData + Semigroup> Threshold<G, K, R1>
220+
for Collection<G, K, R1>
221+
where
222+
G::Timestamp: Lattice + Ord,
223+
{
224+
fn threshold_core<R2, F, Tr>(&self, name: &str, thresh: F) -> Arranged<G, TraceAgent<Tr>>
225+
where
226+
R2: Abelian,
227+
F: FnMut(&K, &R1) -> R2 + 'static,
228+
Tr: Trace + TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R2> + 'static,
229+
Tr::Batch: Batch<K, (), G::Timestamp, R2>,
230+
Tr::Cursor: Cursor<K, (), G::Timestamp, R2>,
231+
{
173232
self.arrange_by_self_named(&format!("Arrange: {}", name))
174-
.threshold_named(name, thresh)
233+
.threshold_core(name, thresh)
175234
}
176235
}
177236

178237
impl<G: Scope, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
179238
where
180-
G::Timestamp: Lattice+Ord,
181-
T1: TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R1>+Clone+'static,
239+
G::Timestamp: Lattice + Ord,
240+
T1: TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R1> + Clone + 'static,
182241
T1::Batch: BatchReader<K, (), G::Timestamp, R1>,
183242
T1::Cursor: Cursor<K, (), G::Timestamp, R1>,
184243
{
185-
fn threshold_named<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
186-
self.reduce_abelian::<_,DefaultKeyTrace<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
187-
.as_collection(|k,_| k.clone())
244+
fn threshold_core<R2, F, Tr>(&self, name: &str, mut thresh: F) -> Arranged<G, TraceAgent<Tr>>
245+
where
246+
R2: Abelian,
247+
F: FnMut(&K, &R1) -> R2 + 'static,
248+
Tr: Trace + TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R2> + 'static,
249+
Tr::Batch: Batch<K, (), G::Timestamp, R2>,
250+
Tr::Cursor: Cursor<K, (), G::Timestamp, R2>,
251+
{
252+
self.reduce_abelian::<_, Tr>(name, move |k, s, t| {
253+
t.push(((), thresh(k, &s[0].1)));
254+
})
188255
}
189256
}
190257

@@ -210,16 +277,52 @@ pub trait Count<G: Scope, K: Data, R: Semigroup> where G::Timestamp: Lattice+Ord
210277
/// });
211278
/// }
212279
/// ```
213-
fn count(&self) -> Collection<G, (K, R), isize>;
280+
fn count(&self) -> Collection<G, (K, R), isize> {
281+
self.count_core::<_, DefaultValTrace<_, _, _, _>>()
282+
.as_collection(|key, count| (key.clone(), count.clone()))
283+
}
284+
285+
/// Counts the number of occurrences of each element, returning an [arrangement](Arranged)
286+
///
287+
/// # Examples
288+
///
289+
/// ```
290+
/// extern crate timely;
291+
/// extern crate differential_dataflow;
292+
///
293+
/// use differential_dataflow::input::Input;
294+
/// use differential_dataflow::operators::Count;
295+
///
296+
/// fn main() {
297+
/// ::timely::example(|scope| {
298+
/// // report the number of occurrences of each key
299+
/// scope.new_collection_from(1 .. 10).1
300+
/// .map(|x| x / 3)
301+
/// .count();
302+
/// });
303+
/// }
304+
/// ```
305+
fn count_core<R2, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
306+
where
307+
R2: Abelian + From<i8>,
308+
Tr: Trace + TraceReader<Key = K, Val = R, Time = G::Timestamp, R = R2> + 'static,
309+
Tr::Batch: Batch<K, R, G::Timestamp, R2>,
310+
Tr::Cursor: Cursor<K, R, G::Timestamp, R2>;
214311
}
215312

216313
impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count<G, K, R> for Collection<G, K, R>
217314
where
218315
G::Timestamp: Lattice+Ord,
219316
{
220-
fn count(&self) -> Collection<G, (K, R), isize> {
317+
fn count_core<R2, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
318+
where
319+
R2: Abelian + From<i8>,
320+
Tr: Trace + TraceReader<Key = K, Val = R, Time = G::Timestamp, R = R2> + 'static,
321+
Tr::Batch: Batch<K, R, G::Timestamp, R2>,
322+
Tr::Cursor: Cursor<K, R, G::Timestamp, R2>,
323+
{
221324
self.arrange_by_self_named("Arrange: Count")
222-
.count()
325+
.count_core()
223326
}
224327
}
225328

@@ -230,9 +333,14 @@ where
230333
T1::Batch: BatchReader<K, (), G::Timestamp, R>,
231334
T1::Cursor: Cursor<K, (), G::Timestamp, R>,
232335
{
233-
fn count(&self) -> Collection<G, (K, R), isize> {
234-
self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), 1)))
235-
.as_collection(|k,c| (k.clone(), c.clone()))
336+
fn count_core<R2, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
337+
where
338+
R2: Abelian + From<i8>,
339+
Tr: Trace + TraceReader<Key = K, Val = R, Time = G::Timestamp, R = R2> + 'static,
340+
Tr::Batch: Batch<K, R, G::Timestamp, R2>,
341+
Tr::Cursor: Cursor<K, R, G::Timestamp, R2>,
342+
{
343+
self.reduce_abelian::<_, Tr>("Count", |_k, s, t| t.push((s[0].1.clone(), R2::from(1))))
236344
}
237345
}
238346

0 commit comments

Comments
 (0)