Skip to content

Commit 93da913

Browse files
Generalize arrangements to Containers (#281)
* Generalize Trie layers to containers generalized implementation generalize ord to containers very silly error prototype column container demonstrate potential improvement tabs to spaces enable regions in batch formation fix test * Rename Container to BatchContainer, use the columnation export from Timely Signed-off-by: Moritz Hoffmann <antiguru@gmail.com> * Expose capacity/reserve of TimelyStack Signed-off-by: Moritz Hoffmann <antiguru@gmail.com> * Tidy comments and whitespace Signed-off-by: Moritz Hoffmann <antiguru@gmail.com> Co-authored-by: Moritz Hoffmann <antiguru@gmail.com>
1 parent 78e7319 commit 93da913

File tree

10 files changed

+483
-208
lines changed

10 files changed

+483
-208
lines changed

examples/columnation.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
extern crate timely;
2+
extern crate differential_dataflow;
3+
4+
use timely::dataflow::operators::probe::Handle;
5+
6+
use differential_dataflow::input::Input;
7+
8+
fn main() {
9+
10+
let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap();
11+
let size: usize = std::env::args().nth(2).unwrap().parse().unwrap();
12+
13+
let mode = std::env::args().any(|a| a == "new");
14+
15+
if mode {
16+
println!("Running NEW arrangement");
17+
}
18+
else {
19+
println!("Running OLD arrangement");
20+
}
21+
22+
let timer1 = ::std::time::Instant::now();
23+
let timer2 = timer1.clone();
24+
25+
// define a new computational scope, in which to run BFS
26+
timely::execute_from_args(std::env::args(), move |worker| {
27+
28+
// define BFS dataflow; return handles to roots and edges inputs
29+
let mut probe = Handle::new();
30+
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {
31+
32+
use differential_dataflow::operators::{arrange::Arrange, JoinCore};
33+
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, ColKeySpine};
34+
35+
let (data_input, data) = scope.new_collection::<String, isize>();
36+
let (keys_input, keys) = scope.new_collection::<String, isize>();
37+
38+
if mode {
39+
let data = data.arrange::<ColKeySpine<_,_,_>>();
40+
let keys = keys.arrange::<ColKeySpine<_,_,_>>();
41+
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
42+
.probe_with(&mut probe);
43+
}
44+
else {
45+
let data = data.arrange::<OrdKeySpine<_,_,_>>();
46+
let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
47+
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
48+
.probe_with(&mut probe);
49+
}
50+
51+
(data_input, keys_input)
52+
});
53+
54+
// Load up data in batches.
55+
let mut counter = 0;
56+
while counter < 10 * keys {
57+
let mut i = worker.index();
58+
while i < size {
59+
let val = (counter + i) % keys;
60+
data_input.insert(format!("{:?}", val));
61+
i += worker.peers();
62+
}
63+
counter += size;
64+
data_input.advance_to(data_input.time() + 1);
65+
data_input.flush();
66+
keys_input.advance_to(keys_input.time() + 1);
67+
keys_input.flush();
68+
while probe.less_than(data_input.time()) {
69+
worker.step();
70+
}
71+
}
72+
println!("{:?}\tloading complete", timer1.elapsed());
73+
74+
let mut queries = 0;
75+
76+
while queries < 10 * keys {
77+
let mut i = worker.index();
78+
while i < size {
79+
let val = (queries + i) % keys;
80+
keys_input.insert(format!("{:?}", val));
81+
i += worker.peers();
82+
}
83+
queries += size;
84+
data_input.advance_to(data_input.time() + 1);
85+
data_input.flush();
86+
keys_input.advance_to(keys_input.time() + 1);
87+
keys_input.flush();
88+
while probe.less_than(data_input.time()) {
89+
worker.step();
90+
}
91+
}
92+
93+
println!("{:?}\tqueries complete", timer1.elapsed());
94+
95+
// loop { }
96+
97+
}).unwrap();
98+
99+
println!("{:?}\tshut down", timer2.elapsed());
100+
101+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@
7171
//! of the new and old counts of the old and new degrees of the affected node).
7272
7373
#![forbid(missing_docs)]
74+
#![allow(array_into_iter)]
75+
7476

7577
use std::fmt::Debug;
7678

src/operators/arrange/arrangement.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -542,9 +542,6 @@ where
542542
// Capabilities for the lower envelope of updates in `batcher`.
543543
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
544544

545-
let mut buffer = Vec::new();
546-
547-
548545
let (activator, effort) =
549546
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
550547
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
@@ -569,8 +566,7 @@ where
569566

570567
input.for_each(|cap, data| {
571568
capabilities.insert(cap.retain());
572-
data.swap(&mut buffer);
573-
batcher.push_batch(&mut buffer);
569+
batcher.push_batch(data);
574570
});
575571

576572
// The frontier may have advanced by multiple elements, which is an issue because

src/trace/implementations/merge_batcher.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! A general purpose `Batcher` implementation based on radix sort.
22
3+
use timely::communication::message::RefOrMut;
34
use timely::progress::frontier::Antichain;
45

56
use ::difference::Semigroup;
@@ -33,8 +34,20 @@ where
3334
}
3435

3536
#[inline(never)]
36-
fn push_batch(&mut self, batch: &mut Vec<((B::Key,B::Val),B::Time,B::R)>) {
37-
self.sorter.push(batch);
37+
fn push_batch(&mut self, batch: RefOrMut<Vec<((B::Key,B::Val),B::Time,B::R)>>) {
38+
// `batch` is either a shared reference or an owned allocations.
39+
match batch {
40+
RefOrMut::Ref(reference) => {
41+
// This is a moment at which we could capture the allocations backing
42+
// `batch` into a different form of region, rather than just cloning.
43+
let mut owned: Vec<_> = self.sorter.empty();
44+
owned.clone_from(reference);
45+
self.sorter.push(&mut owned);
46+
},
47+
RefOrMut::Mut(reference) => {
48+
self.sorter.push(reference);
49+
}
50+
}
3851
}
3952

4053
// Sealing a batch means finding those updates with times not greater or equal to any time
@@ -58,7 +71,6 @@ where
5871
for mut buffer in merged.drain(..) {
5972
for ((key, val), time, diff) in buffer.drain(..) {
6073
if upper.less_equal(&time) {
61-
// keep_count += 1;
6274
self.frontier.insert(time.clone());
6375
if keep.len() == keep.capacity() {
6476
if keep.len() > 0 {
@@ -69,7 +81,6 @@ where
6981
keep.push(((key, val), time, diff));
7082
}
7183
else {
72-
// seal_count += 1;
7384
builder.push((key, val, time, diff));
7485
}
7586
}

0 commit comments

Comments
 (0)