Skip to content

Commit 24df201

Browse files
Pointstamps and dynamic scopes (#378)
1 parent e9e1157 commit 24df201

File tree

4 files changed

+433
-0
lines changed

4 files changed

+433
-0
lines changed

examples/dynamic.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
extern crate rand;
2+
extern crate timely;
3+
extern crate differential_dataflow;
4+
5+
use rand::{Rng, SeedableRng, StdRng};
6+
7+
use timely::dataflow::*;
8+
use timely::dataflow::operators::probe::Handle;
9+
10+
use differential_dataflow::input::Input;
11+
use differential_dataflow::Collection;
12+
use differential_dataflow::operators::*;
13+
use differential_dataflow::lattice::Lattice;
14+
use differential_dataflow::logging::DifferentialEvent;
15+
16+
type Node = u32;
17+
type Edge = (Node, Node);
18+
19+
fn main() {
20+
21+
let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
22+
let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap();
23+
let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap();
24+
let rounds: u32 = std::env::args().nth(4).unwrap().parse().unwrap();
25+
let inspect: bool = std::env::args().nth(5).unwrap() == "inspect";
26+
27+
// define a new computational scope, in which to run BFS
28+
timely::execute_from_args(std::env::args(), move |worker| {
29+
30+
if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {
31+
32+
eprintln!("enabled DIFFERENTIAL logging to {}", addr);
33+
34+
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
35+
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
36+
let mut logger = ::timely::logging::BatchLogger::new(writer);
37+
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
38+
logger.publish_batch(time, data)
39+
);
40+
}
41+
else {
42+
panic!("Could not connect to differential log address: {:?}", addr);
43+
}
44+
}
45+
46+
let timer = ::std::time::Instant::now();
47+
48+
// define BFS dataflow; return handles to roots and edges inputs
49+
let mut probe = Handle::new();
50+
let (mut roots, mut graph) = worker.dataflow(|scope| {
51+
52+
let (root_input, roots) = scope.new_collection();
53+
let (edge_input, graph) = scope.new_collection();
54+
55+
let mut result = bfs(&graph, &roots);
56+
57+
if !inspect {
58+
result = result.filter(|_| false);
59+
}
60+
61+
result.map(|(_,l)| l)
62+
.consolidate()
63+
.inspect(|x| println!("\t{:?}", x))
64+
.probe_with(&mut probe);
65+
66+
(root_input, edge_input)
67+
});
68+
69+
let seed: &[_] = &[1, 2, 3, 4];
70+
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
71+
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions
72+
73+
roots.insert(0);
74+
roots.close();
75+
76+
println!("performing BFS on {} nodes, {} edges:", nodes, edges);
77+
78+
if worker.index() == 0 {
79+
for _ in 0 .. edges {
80+
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
81+
}
82+
}
83+
84+
println!("{:?}\tloaded", timer.elapsed());
85+
86+
graph.advance_to(1);
87+
graph.flush();
88+
worker.step_or_park_while(None, || probe.less_than(graph.time()));
89+
90+
println!("{:?}\tstable", timer.elapsed());
91+
92+
for round in 0 .. rounds {
93+
for element in 0 .. batch {
94+
if worker.index() == 0 {
95+
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
96+
graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
97+
}
98+
graph.advance_to(2 + round * batch + element);
99+
}
100+
graph.flush();
101+
102+
let timer2 = ::std::time::Instant::now();
103+
worker.step_or_park_while(None, || probe.less_than(&graph.time()));
104+
105+
if worker.index() == 0 {
106+
let elapsed = timer2.elapsed();
107+
println!("{:?}\t{:?}:\t{}", timer.elapsed(), round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
108+
}
109+
}
110+
println!("finished; elapsed: {:?}", timer.elapsed());
111+
}).unwrap();
112+
}
113+
114+
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
115+
fn bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
116+
where G::Timestamp: Lattice+Ord {
117+
118+
use timely::order::Product;
119+
use iterate::Variable;
120+
use differential_dataflow::dynamic::{feedback_summary, pointstamp::PointStamp};
121+
122+
// initialize roots as reaching themselves at distance 0
123+
let nodes = roots.map(|x| (x, 0));
124+
125+
// repeatedly update minimal distances each node can be reached from each root
126+
nodes.scope().iterative::<PointStamp<usize>, _, _>(|inner| {
127+
128+
// These enter the statically bound scope, rather than any iterative scopes.
129+
// We do not *need* to enter them into the dynamic scope, as they are static
130+
// within that scope.
131+
let edges = edges.enter(inner);
132+
let nodes = nodes.enter(inner);
133+
134+
// Create a variable for label iteration.
135+
let inner = feedback_summary::<usize>(1, 1);
136+
let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner });
137+
138+
let next =
139+
label
140+
.join_map(&edges, |_k,l,d| (*d, l+1))
141+
.concat(&nodes)
142+
.reduce(|_, s, t| t.push((*s[0].0, 1)))
143+
;
144+
145+
label.set(&next);
146+
// Leave the dynamic iteration, stripping off the last timestamp coordinate.
147+
next
148+
.leave_dynamic(1)
149+
.inspect(|x| println!("{:?}", x))
150+
.leave()
151+
})
152+
153+
}

src/dynamic/mod.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//! Types and operators for dynamically scoped iterative dataflows.
2+
//!
3+
//! Scopes in timely dataflow are expressed statically, as part of the type system.
4+
//! This affords many efficiencies, as well as type-driven reassurance of correctness.
5+
//! However, there are times you need scopes whose organization is discovered only at runtime.
6+
//! Naiad and Materialize are examples: the latter taking arbitrary SQL into iterative dataflows.
7+
//!
8+
//! This module provides a timestamp type `Pointstamp` that can represent an update with an
9+
//! unboundedly long sequence of some `T: Timestamp`, ordered by the product order by which times
10+
//! in iterative dataflows are ordered. The module also provides methods for manipulating these
11+
//! timestamps to emulate the movement of update streams in to, within, and out of iterative scopes.
12+
//!
13+
14+
pub mod pointstamp;
15+
16+
use timely::dataflow::{Scope, scopes::Child};
17+
use timely::order::Product;
18+
use timely::progress::Timestamp;
19+
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
20+
use timely::dataflow::channels::pact::Pipeline;
21+
use timely::progress::Antichain;
22+
23+
use difference::Semigroup;
24+
use {Collection, Data};
25+
use collection::AsCollection;
26+
use dynamic::pointstamp::PointStamp;
27+
use dynamic::pointstamp::PointStampSummary;
28+
29+
impl<G, D, R, T> Collection<Child<'_, G, Product<G::Timestamp, PointStamp<T>>>, D, R>
30+
where
31+
G: Scope,
32+
D: Data,
33+
R: Semigroup,
34+
T: Timestamp+Default,
35+
{
36+
/// Enters a dynamically created scope which has `level` timestamp coordinates.
37+
pub fn enter_dynamic(&self, _level: usize) -> Self {
38+
(*self).clone()
39+
}
40+
/// Leaves a dynamically created scope which has `level` timestamp coordinates.
41+
pub fn leave_dynamic(&self, level: usize) -> Self {
42+
// Create a unary operator that will strip all but `level-1` timestamp coordinates.
43+
let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope());
44+
let (mut output, stream) = builder.new_output();
45+
let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]);
46+
47+
let mut vector = Default::default();
48+
builder.build(move |_capability| move |_frontier| {
49+
let mut output = output.activate();
50+
input.for_each(|cap, data| {
51+
data.swap(&mut vector);
52+
let mut new_time = cap.time().clone();
53+
new_time.inner.vector.truncate(level - 1);
54+
let new_cap = cap.delayed(&new_time);
55+
for (_data, time, _diff) in vector.iter_mut() {
56+
time.inner.vector.truncate(level - 1);
57+
}
58+
output.session(&new_cap).give_vec(&mut vector);
59+
});
60+
});
61+
62+
stream.as_collection()
63+
}
64+
}
65+
66+
/// Produces the summary for a feedback operator at `level`, applying `summary` to that coordinate.
67+
pub fn feedback_summary<T>(level: usize, summary: T::Summary) -> PointStampSummary<T::Summary>
68+
where
69+
T: Timestamp+Default,
70+
{
71+
PointStampSummary {
72+
retain: None,
73+
actions: std::iter::repeat(Default::default()).take(level-1).chain(std::iter::once(summary)).collect(),
74+
}
75+
}

0 commit comments

Comments
 (0)