Skip to content

Commit aed365f

Browse files
committed
Updated propagate_core documentation
1 parent e461000 commit aed365f

File tree

3 files changed

+67
-50
lines changed

3 files changed

+67
-50
lines changed

src/algorithms/graphs/propagate.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ use operators::arrange::arrangement::Arranged;
5353
/// Propagates labels forward, retaining the minimum label.
5454
///
5555
/// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows
56-
/// a method `logic` to specify the rounds in which we introduce various labels. The output
57-
/// of `logic should be a number in the interval [0,64],
56+
/// a method `logic` to specify the rounds in which we introduce various labels.
5857
pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
5958
where
6059
G: Scope,
@@ -100,7 +99,7 @@ where
10099
let labels =
101100
proposals
102101
.concat(&nodes)
103-
.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));
102+
.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1))));
104103

105104
let propagate: Collection<_, (N, L), R> =
106105
labels

src/operators/consolidate.rs

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,27 @@
66
//! underlying system can more clearly see that no work must be done in the later case, and we can
77
//! drop out of, e.g. iterative computations.
88
9-
use timely::dataflow::Scope;
10-
11-
use ::{Collection, ExchangeData, Hashable};
12-
use ::difference::Semigroup;
13-
use operators::arrange::arrangement::Arrange;
9+
use crate::{
10+
difference::Semigroup,
11+
lattice::Lattice,
12+
operators::arrange::{Arrange, Arranged, TraceAgent},
13+
trace::{implementations::ord::OrdKeySpine, Batch, Cursor, Trace, TraceReader},
14+
AsCollection, Collection, ExchangeData, Hashable,
15+
};
16+
use timely::dataflow::{channels::pact::Pipeline, operators::Operator, Scope};
1417

1518
/// An extension method for consolidating weighted streams.
16-
pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
19+
pub trait Consolidate<S, D, R>: Sized
20+
where
21+
S: Scope,
22+
S::Timestamp: Lattice,
23+
D: ExchangeData + Hashable,
24+
R: Semigroup,
25+
{
1726
/// Aggregates the weights of equal records into at most one record.
1827
///
19-
/// This method uses the type `D`'s `hashed()` method to partition the data. The data are
20-
/// accumulated in place, each held back until their timestamp has completed.
28+
/// This method uses the type `D`'s [`hashed()`](Hashable) method to partition the data.
29+
/// The data is accumulated in place and held back until its timestamp has completed.
2130
///
2231
/// # Examples
2332
///
@@ -40,30 +49,44 @@ pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
4049
/// });
4150
/// }
4251
/// ```
43-
fn consolidate(&self) -> Self {
52+
fn consolidate(&self) -> Collection<S, D, R> {
4453
self.consolidate_named("Consolidate")
4554
}
4655

47-
/// As `consolidate` but with the ability to name the operator.
48-
fn consolidate_named(&self, name: &str) -> Self;
56+
/// A `consolidate` but with the ability to name the operator.
57+
fn consolidate_named(&self, name: &str) -> Collection<S, D, R> {
58+
self.consolidate_core::<OrdKeySpine<_, _, _>>(name)
59+
.as_collection(|data, &()| data.clone())
60+
}
61+
62+
/// Aggregates the weights of equal records into at most one record,
63+
/// returning the intermediate [arrangement](Arranged)
64+
fn consolidate_core<Tr>(&self, name: &str) -> Arranged<S, TraceAgent<Tr>>
65+
where
66+
Tr: Trace + TraceReader<Key = D, Val = (), Time = S::Timestamp, R = R> + 'static,
67+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
68+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>;
4969
}
5070

51-
impl<G: Scope, D, R> Consolidate<D> for Collection<G, D, R>
71+
impl<S, D, R> Consolidate<S, D, R> for Collection<S, D, R>
5272
where
53-
D: ExchangeData+Hashable,
54-
R: ExchangeData+Semigroup,
55-
G::Timestamp: ::lattice::Lattice+Ord,
56-
{
57-
fn consolidate_named(&self, name: &str) -> Self {
58-
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
59-
self.map(|k| (k, ()))
60-
.arrange_named::<DefaultKeyTrace<_,_,_>>(name)
61-
.as_collection(|d: &D, _| d.clone())
73+
S: Scope,
74+
S::Timestamp: Lattice + Ord,
75+
D: ExchangeData + Hashable,
76+
R: ExchangeData + Semigroup,
77+
{
78+
fn consolidate_core<Tr>(&self, name: &str) -> Arranged<S, TraceAgent<Tr>>
79+
where
80+
Tr: Trace + TraceReader<Key = D, Val = (), Time = S::Timestamp, R = R> + 'static,
81+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
82+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
83+
{
84+
self.map(|key| (key, ())).arrange_named(name)
6285
}
6386
}
6487

6588
/// An extension method for consolidating weighted streams.
66-
pub trait ConsolidateStream<D: ExchangeData+Hashable> {
89+
pub trait ConsolidateStream<D: ExchangeData + Hashable> {
6790
/// Aggregates the weights of equal records.
6891
///
6992
/// Unlike `consolidate`, this method does not exchange data and does not
@@ -98,19 +121,13 @@ pub trait ConsolidateStream<D: ExchangeData+Hashable> {
98121

99122
impl<G: Scope, D, R> ConsolidateStream<D> for Collection<G, D, R>
100123
where
101-
D: ExchangeData+Hashable,
102-
R: ExchangeData+Semigroup,
103-
G::Timestamp: ::lattice::Lattice+Ord,
104-
{
124+
D: ExchangeData + Hashable,
125+
R: ExchangeData + Semigroup,
126+
G::Timestamp: ::lattice::Lattice + Ord,
127+
{
105128
fn consolidate_stream(&self) -> Self {
106-
107-
use timely::dataflow::channels::pact::Pipeline;
108-
use timely::dataflow::operators::Operator;
109-
use collection::AsCollection;
110-
111129
self.inner
112130
.unary(Pipeline, "ConsolidateStream", |_cap, _info| {
113-
114131
let mut vector = Vec::new();
115132
move |input, output| {
116133
input.for_each(|time, data| {

src/operators/reduce.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ pub trait Threshold<G: Scope, K: Data, R1: Semigroup> where G::Timestamp: Lattic
151151
R2: Abelian,
152152
F: FnMut(&K, &R1) -> R2 + 'static,
153153
Tr: Trace + TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R2> + 'static,
154-
Tr::Batch: Batch<K, (), G::Timestamp, R2>,
155-
Tr::Cursor: Cursor<K, (), G::Timestamp, R2>;
154+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
155+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>;
156156

157157
/// Reduces the collection to one occurrence of each distinct element.
158158
///
@@ -209,8 +209,8 @@ pub trait Threshold<G: Scope, K: Data, R1: Semigroup> where G::Timestamp: Lattic
209209
where
210210
R2: Abelian + From<i8>,
211211
Tr: Trace + TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R2> + 'static,
212-
Tr::Batch: Batch<K, (), G::Timestamp, R2>,
213-
Tr::Cursor: Cursor<K, (), G::Timestamp, R2>,
212+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
213+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
214214
{
215215
self.threshold_core(name, |_, _| R2::from(1i8))
216216
}
@@ -226,8 +226,8 @@ where
226226
R2: Abelian,
227227
F: FnMut(&K, &R1) -> R2 + 'static,
228228
Tr: Trace + TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R2> + 'static,
229-
Tr::Batch: Batch<K, (), G::Timestamp, R2>,
230-
Tr::Cursor: Cursor<K, (), G::Timestamp, R2>,
229+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
230+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
231231
{
232232
self.arrange_by_self_named(&format!("Arrange: {}", name))
233233
.threshold_core(name, thresh)
@@ -246,8 +246,8 @@ where
246246
R2: Abelian,
247247
F: FnMut(&K, &R1) -> R2 + 'static,
248248
Tr: Trace + TraceReader<Key = K, Val = (), Time = G::Timestamp, R = R2> + 'static,
249-
Tr::Batch: Batch<K, (), G::Timestamp, R2>,
250-
Tr::Cursor: Cursor<K, (), G::Timestamp, R2>,
249+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
250+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
251251
{
252252
self.reduce_abelian::<_, Tr>(name, move |k, s, t| {
253253
t.push(((), thresh(k, &s[0].1)));
@@ -292,22 +292,23 @@ pub trait Count<G: Scope, K: Data, R: Semigroup> where G::Timestamp: Lattice+Ord
292292
///
293293
/// use differential_dataflow::input::Input;
294294
/// use differential_dataflow::operators::Count;
295+
/// use differential_dataflow::operators::arrange::OrdValSpine;
295296
///
296297
/// fn main() {
297298
/// ::timely::example(|scope| {
298299
/// // report the number of occurrences of each key
299300
/// scope.new_collection_from(1 .. 10).1
300301
/// .map(|x| x / 3)
301-
/// .count();
302+
/// .count_core::<isize, OrdValSpine<_, _, _, _>();
302303
/// });
303304
/// }
304305
/// ```
305306
fn count_core<R2, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
306307
where
307308
R2: Abelian + From<i8>,
308309
Tr: Trace + TraceReader<Key = K, Val = R, Time = G::Timestamp, R = R2> + 'static,
309-
Tr::Batch: Batch<K, R, G::Timestamp, R2>,
310-
Tr::Cursor: Cursor<K, R, G::Timestamp, R2>;
310+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
311+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>;
311312
}
312313

313314
impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count<G, K, R> for Collection<G, K, R>
@@ -318,8 +319,8 @@ where
318319
where
319320
R2: Abelian + From<i8>,
320321
Tr: Trace + TraceReader<Key = K, Val = R, Time = G::Timestamp, R = R2> + 'static,
321-
Tr::Batch: Batch<K, R, G::Timestamp, R2>,
322-
Tr::Cursor: Cursor<K, R, G::Timestamp, R2>,
322+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
323+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
323324
{
324325
self.arrange_by_self_named("Arrange: Count")
325326
.count_core()
@@ -337,8 +338,8 @@ where
337338
where
338339
R2: Abelian + From<i8>,
339340
Tr: Trace + TraceReader<Key = K, Val = R, Time = G::Timestamp, R = R2> + 'static,
340-
Tr::Batch: Batch<K, R, G::Timestamp, R2>,
341-
Tr::Cursor: Cursor<K, R, G::Timestamp, R2>,
341+
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
342+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
342343
{
343344
self.reduce_abelian::<_, Tr>("Count", |_k, s, t| t.push((s[0].1.clone(), R2::from(1))))
344345
}

0 commit comments

Comments
 (0)