Skip to content

Commit 3450dd7

Browse files
committed
Arranged consolidation
1 parent aed365f commit 3450dd7

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

.vscode/settings.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"cSpell.words": [
3+
"Hashable"
4+
]
5+
}

src/operators/consolidate.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,34 @@ where
5959
.as_collection(|data, &()| data.clone())
6060
}
6161

62+
/// A `consolidate` that returns the intermediate [arrangement](Arranged)
63+
///
64+
/// # Example
65+
///
66+
/// ```rust
67+
/// use differential_dataflow::{
68+
/// input::Input,
69+
/// operators::{Consolidate, JoinCore},
70+
/// };
71+
///
72+
/// timely::example(|scope| {
73+
/// let (_, collection) = scope.new_collection_from(0..10u32);
74+
///
75+
/// let keys = collection
76+
/// .flat_map(|x| (0..x))
77+
/// .concat(&collection.negate())
78+
/// .consolidate_arranged();
79+
///
80+
/// collection
81+
/// .map(|x| (x, x * 2))
82+
/// .join_core(&keys, |&key, &(), &value| (key, value))
83+
/// .inspect(|x| println!("{:?}", x));
84+
/// });
85+
/// ```
86+
fn consolidate_arranged(&self) -> Arranged<S, TraceAgent<OrdKeySpine<D, S::Timestamp, R>>> {
87+
self.consolidate_core::<OrdKeySpine<_, _, _>>("Consolidate")
88+
}
89+
6290
/// Aggregates the weights of equal records into at most one record,
6391
/// returning the intermediate [arrangement](Arranged)
6492
fn consolidate_core<Tr>(&self, name: &str) -> Arranged<S, TraceAgent<Tr>>
@@ -123,7 +151,7 @@ impl<G: Scope, D, R> ConsolidateStream<D> for Collection<G, D, R>
123151
where
124152
D: ExchangeData + Hashable,
125153
R: ExchangeData + Semigroup,
126-
G::Timestamp: ::lattice::Lattice + Ord,
154+
G::Timestamp: Lattice + Ord,
127155
{
128156
fn consolidate_stream(&self) -> Self {
129157
self.inner

0 commit comments

Comments
 (0)