Skip to content

Commit e9e1157

Browse files
Move consolidate methods to inherent implementations (#376)
1 parent 8a04699 commit e9e1157

File tree

8 files changed

+28
-41
lines changed

8 files changed

+28
-41
lines changed

examples/accumulate.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ extern crate differential_dataflow;
55
use rand::{Rng, SeedableRng, StdRng};
66

77
use differential_dataflow::input::Input;
8-
use differential_dataflow::operators::Consolidate;
98

109
fn main() {
1110

examples/arrange.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use differential_dataflow::operators::arrange::ArrangeByKey;
1414
use differential_dataflow::operators::reduce::Reduce;
1515
use differential_dataflow::operators::join::JoinCore;
1616
use differential_dataflow::operators::Iterate;
17-
use differential_dataflow::operators::Consolidate;
1817

1918
fn main() {
2019

examples/graspan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use differential_dataflow::lattice::Lattice;
1717
use differential_dataflow::input::{Input, InputSession};
1818
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
1919
use differential_dataflow::operators::iterate::Variable;
20-
use differential_dataflow::operators::{Threshold, JoinCore, Consolidate};
20+
use differential_dataflow::operators::{Threshold, JoinCore};
2121

2222
type Node = usize;
2323
type Edge = (Node, Node);

src/collection.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
531531
R: ::ExchangeData+Hashable,
532532
G::Timestamp: Lattice+Ord,
533533
{
534-
use operators::consolidate::Consolidate;
535534
self.consolidate()
536535
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
537536
}
@@ -545,6 +544,7 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
545544
use timely::dataflow::scopes::ScopeParent;
546545
use timely::progress::timestamp::Refines;
547546

547+
/// Methods requiring a nested scope.
548548
impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup> Collection<Child<'a, G, T>, D, R>
549549
where
550550
T: Refines<<G as ScopeParent>::Timestamp>,
@@ -582,6 +582,7 @@ where
582582
}
583583
}
584584

585+
/// Methods requiring a region as the scope.
585586
impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>, D, R>
586587
{
587588
/// Returns the value of a Collection from a nested region to its containing scope.
@@ -595,6 +596,7 @@ impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>,
595596
}
596597
}
597598

599+
/// Methods requiring an Abelian difference, to support negation.
598600
impl<G: Scope, D: Data, R: Abelian> Collection<G, D, R> where G::Timestamp: Data {
599601
/// Creates a new collection whose counts are the negation of those in the input.
600602
///

src/operators/consolidate.rs

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,18 @@ use timely::dataflow::Scope;
1010

1111
use ::{Collection, ExchangeData, Hashable};
1212
use ::difference::Semigroup;
13-
use operators::arrange::arrangement::Arrange;
1413

15-
/// An extension method for consolidating weighted streams.
16-
pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
14+
use Data;
15+
use lattice::Lattice;
16+
17+
/// Methods which require data be arrangeable.
18+
impl<G, D, R> Collection<G, D, R>
19+
where
20+
G: Scope,
21+
G::Timestamp: Data+Lattice,
22+
D: ExchangeData+Hashable,
23+
R: Semigroup+ExchangeData,
24+
{
1725
/// Aggregates the weights of equal records into at most one record.
1826
///
1927
/// This method uses the type `D`'s `hashed()` method to partition the data. The data are
@@ -26,7 +34,6 @@ pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
2634
/// extern crate differential_dataflow;
2735
///
2836
/// use differential_dataflow::input::Input;
29-
/// use differential_dataflow::operators::Consolidate;
3037
///
3138
/// fn main() {
3239
/// ::timely::example(|scope| {
@@ -40,30 +47,23 @@ pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
4047
/// });
4148
/// }
4249
/// ```
43-
fn consolidate(&self) -> Self {
44-
self.consolidate_named("Consolidate")
50+
pub fn consolidate(&self) -> Self {
51+
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
52+
self.consolidate_named::<DefaultKeyTrace<_,_,_>>("Consolidate")
4553
}
4654

47-
/// As `consolidate` but with the ability to name the operator.
48-
fn consolidate_named(&self, name: &str) -> Self;
49-
}
50-
51-
impl<G: Scope, D, R> Consolidate<D> for Collection<G, D, R>
52-
where
53-
D: ExchangeData+Hashable,
54-
R: ExchangeData+Semigroup,
55-
G::Timestamp: ::lattice::Lattice+Ord,
56-
{
57-
fn consolidate_named(&self, name: &str) -> Self {
58-
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
55+
/// As `consolidate` but with the ability to name the operator and specify the trace type.
56+
pub fn consolidate_named<Tr>(&self, name: &str) -> Self
57+
where
58+
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,R=R>+'static,
59+
Tr::Batch: crate::trace::Batch,
60+
{
61+
use operators::arrange::arrangement::Arrange;
5962
self.map(|k| (k, ()))
60-
.arrange_named::<DefaultKeyTrace<_,_,_>>(name)
63+
.arrange_named::<Tr>(name)
6164
.as_collection(|d: &D, _| d.clone())
6265
}
63-
}
6466

65-
/// An extension method for consolidating weighted streams.
66-
pub trait ConsolidateStream<D: ExchangeData+Hashable> {
6767
/// Aggregates the weights of equal records.
6868
///
6969
/// Unlike `consolidate`, this method does not exchange data and does not
@@ -79,7 +79,6 @@ pub trait ConsolidateStream<D: ExchangeData+Hashable> {
7979
/// extern crate differential_dataflow;
8080
///
8181
/// use differential_dataflow::input::Input;
82-
/// use differential_dataflow::operators::consolidate::ConsolidateStream;
8382
///
8483
/// fn main() {
8584
/// ::timely::example(|scope| {
@@ -93,16 +92,7 @@ pub trait ConsolidateStream<D: ExchangeData+Hashable> {
9392
/// });
9493
/// }
9594
/// ```
96-
fn consolidate_stream(&self) -> Self;
97-
}
98-
99-
impl<G: Scope, D, R> ConsolidateStream<D> for Collection<G, D, R>
100-
where
101-
D: ExchangeData+Hashable,
102-
R: ExchangeData+Semigroup,
103-
G::Timestamp: ::lattice::Lattice+Ord,
104-
{
105-
fn consolidate_stream(&self) -> Self {
95+
pub fn consolidate_stream(&self) -> Self {
10696

10797
use timely::dataflow::channels::pact::Pipeline;
10898
use timely::dataflow::operators::Operator;

src/operators/iterate.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ pub trait Iterate<G: Scope, D: Data, R: Semigroup> {
6363
///
6464
/// use differential_dataflow::input::Input;
6565
/// use differential_dataflow::operators::Iterate;
66-
/// use differential_dataflow::operators::Consolidate;
6766
///
6867
/// fn main() {
6968
/// ::timely::example(|scope| {
@@ -145,7 +144,6 @@ impl<G: Scope, D: Ord+Data+Debug, R: Semigroup> Iterate<G, D, R> for G {
145144
///
146145
/// use differential_dataflow::input::Input;
147146
/// use differential_dataflow::operators::iterate::Variable;
148-
/// use differential_dataflow::operators::Consolidate;
149147
///
150148
/// fn main() {
151149
/// ::timely::example(|scope| {

src/operators/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).
66
77
pub use self::reduce::{Reduce, Threshold, Count};
8-
pub use self::consolidate::Consolidate;
98
pub use self::iterate::Iterate;
109
pub use self::join::{Join, JoinCore};
1110
pub use self::count::CountTotal;

tests/join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ extern crate differential_dataflow;
44
use timely::dataflow::operators::{ToStream, Capture, Map};
55
use timely::dataflow::operators::capture::Extract;
66
use differential_dataflow::AsCollection;
7-
use differential_dataflow::operators::{Consolidate, Join, Count};
7+
use differential_dataflow::operators::{Join, Count};
88

99
#[test]
1010
fn join() {

0 commit comments

Comments
 (0)