Skip to content

Commit 0feeb66

Browse files
committed
Require handoffs and subgraphs to have friendly names
1 parent 2b9fac8 commit 0feeb66

File tree

21 files changed

+559
-294
lines changed

21 files changed

+559
-294
lines changed

benches/benches/fork_join.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,34 +15,41 @@ fn benchmark_hydroflow(c: &mut Criterion) {
1515
b.iter(|| {
1616
let mut df = Hydroflow::new();
1717

18-
let (start_send, start_recv) = df.make_edge::<VecHandoff<usize>>();
18+
let (start_send, start_recv) = df.make_edge::<VecHandoff<usize>>("start".into());
1919

2020
let mut sent = false;
21-
df.add_subgraph_source(start_send, move |_ctx, send| {
21+
df.add_subgraph_source("source".into(), start_send, move |_ctx, send| {
2222
if !sent {
2323
sent = true;
2424
send.give(Iter(0..NUM_INTS));
2525
}
2626
});
2727

28-
let (send1, mut recv1) = df.make_edge::<VecHandoff<_>>();
29-
let (send2, mut recv2) = df.make_edge::<VecHandoff<_>>();
30-
31-
df.add_subgraph_in_2out(start_recv, send1, send2, |_ctx, recv, send1, send2| {
32-
for v in recv.take_inner().into_iter() {
33-
if v % 2 == 0 {
34-
send1.give(Some(v));
35-
} else {
36-
send2.give(Some(v));
28+
let (send1, mut recv1) = df.make_edge::<VecHandoff<_>>("1".into());
29+
let (send2, mut recv2) = df.make_edge::<VecHandoff<_>>("2".into());
30+
31+
df.add_subgraph_in_2out(
32+
"fork".into(),
33+
start_recv,
34+
send1,
35+
send2,
36+
|_ctx, recv, send1, send2| {
37+
for v in recv.take_inner().into_iter() {
38+
if v % 2 == 0 {
39+
send1.give(Some(v));
40+
} else {
41+
send2.give(Some(v));
42+
}
3743
}
38-
}
39-
});
44+
},
45+
);
4046

4147
for _ in 0..NUM_OPS {
42-
let (send1, next_recv1) = df.make_edge();
43-
let (send2, next_recv2) = df.make_edge();
48+
let (send1, next_recv1) = df.make_edge("1".into());
49+
let (send2, next_recv2) = df.make_edge("2".into());
4450

4551
df.add_subgraph_2in_2out(
52+
"join-fork".into(),
4653
recv1,
4754
recv2,
4855
send1,
@@ -66,7 +73,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
6673
recv2 = next_recv2;
6774
}
6875

69-
df.add_subgraph_2sink(recv1, recv2, |_ctx, recv1, recv2| {
76+
df.add_subgraph_2sink("join (merge)".into(), recv1, recv2, |_ctx, recv1, recv2| {
7077
for x in recv1.take_inner() {
7178
black_box(x);
7279
}

benches/benches/identity.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,26 +154,31 @@ fn benchmark_hydroflow(c: &mut Criterion) {
154154
b.iter(|| {
155155
let mut df = Hydroflow::new();
156156

157-
let (next_send, mut next_recv) = df.make_edge::<VecHandoff<usize>>();
157+
let (next_send, mut next_recv) = df.make_edge::<VecHandoff<usize>>("end".into());
158158

159159
let mut sent = false;
160-
df.add_subgraph_source(next_send, move |_ctx, send| {
160+
df.add_subgraph_source("source".into(), next_send, move |_ctx, send| {
161161
if !sent {
162162
sent = true;
163163
send.give(Iter(0..NUM_INTS));
164164
}
165165
});
166166
for _ in 0..NUM_OPS {
167-
let (next_send, next_next_recv) = df.make_edge();
167+
let (next_send, next_next_recv) = df.make_edge("handoff".into());
168168

169-
df.add_subgraph_in_out(next_recv, next_send, |_ctx, recv, send| {
170-
send.give(Iter(recv.take_inner().into_iter()));
171-
});
169+
df.add_subgraph_in_out(
170+
"identity".into(),
171+
next_recv,
172+
next_send,
173+
|_ctx, recv, send| {
174+
send.give(Iter(recv.take_inner().into_iter()));
175+
},
176+
);
172177

173178
next_recv = next_next_recv;
174179
}
175180

176-
df.add_subgraph_sink(next_recv, |_ctx, recv| {
181+
df.add_subgraph_sink("sink".into(), next_recv, |_ctx, recv| {
177182
for x in recv.take_inner() {
178183
black_box(x);
179184
}

benches/benches/reachability.rs

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,26 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
130130
let mut df = Hydroflow::new();
131131

132132
type Hoff = VecHandoff<usize>;
133-
let (reachable_out, merge_lhs) = df.make_edge::<Hoff>();
134-
let (neighbors_out, merge_rhs) = df.make_edge::<Hoff>();
135-
let (merge_out, distinct_in) = df.make_edge::<Hoff>();
136-
let (distinct_out, tee_in) = df.make_edge::<Hoff>();
137-
let (tee_out1, neighbors_in) = df.make_edge::<Hoff>();
138-
let (tee_out2, sink_in) = df.make_edge::<Hoff>();
139-
140-
df.add_subgraph_source(reachable_out, move |_ctx, send| {
141-
send.give(Some(1));
142-
});
133+
let (reachable_out, merge_lhs) =
134+
df.make_edge::<Hoff>("reachable_out -> merge_lhs".into());
135+
let (neighbors_out, merge_rhs) =
136+
df.make_edge::<Hoff>("neighbors_out -> merge_rhs".into());
137+
let (merge_out, distinct_in) = df.make_edge::<Hoff>("merge_out -> distinct_in".into());
138+
let (distinct_out, tee_in) = df.make_edge::<Hoff>("distinct_out -> tee_in".into());
139+
let (tee_out1, neighbors_in) = df.make_edge::<Hoff>("tee_out1 -> neighbors_in".into());
140+
let (tee_out2, sink_in) = df.make_edge::<Hoff>("tee_out2 -> sink_in".into());
141+
142+
df.add_subgraph_source(
143+
"initially reachable source".into(),
144+
reachable_out,
145+
move |_ctx, send| {
146+
send.give(Some(1));
147+
},
148+
);
143149

144150
let seen_handle = df.add_state::<RefCell<HashSet<usize>>>(Default::default());
145151
df.add_subgraph(
152+
"distinct".into(),
146153
tl!(distinct_in),
147154
tl!(distinct_out),
148155
move |context, tl!(recv), tl!(send)| {
@@ -156,6 +163,7 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
156163
);
157164

158165
df.add_subgraph_2in_out(
166+
"merge".into(),
159167
merge_lhs,
160168
merge_rhs,
161169
merge_out,
@@ -165,24 +173,35 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
165173
},
166174
);
167175

168-
df.add_subgraph_in_out(neighbors_in, neighbors_out, move |_ctx, recv, send| {
169-
for v in recv.take_inner() {
170-
if let Some(neighbors) = edges.get(&v) {
171-
send.give(Iter(neighbors.iter().copied()));
176+
df.add_subgraph_in_out(
177+
"get neighbors".into(),
178+
neighbors_in,
179+
neighbors_out,
180+
move |_ctx, recv, send| {
181+
for v in recv.take_inner() {
182+
if let Some(neighbors) = edges.get(&v) {
183+
send.give(Iter(neighbors.iter().copied()));
184+
}
172185
}
173-
}
174-
});
186+
},
187+
);
175188

176-
df.add_subgraph_in_2out(tee_in, tee_out1, tee_out2, |_ctx, recv, send1, send2| {
177-
for v in recv.take_inner() {
178-
send1.give(Some(v));
179-
send2.give(Some(v));
180-
}
181-
});
189+
df.add_subgraph_in_2out(
190+
"tee".into(),
191+
tee_in,
192+
tee_out1,
193+
tee_out2,
194+
|_ctx, recv, send1, send2| {
195+
for v in recv.take_inner() {
196+
send1.give(Some(v));
197+
send2.give(Some(v));
198+
}
199+
},
200+
);
182201

183202
let reachable_verts = Rc::new(RefCell::new(HashSet::new()));
184203
let reachable_inner = reachable_verts.clone();
185-
df.add_subgraph_sink(sink_in, move |_ctx, recv| {
204+
df.add_subgraph_sink("output sink".into(), sink_in, move |_ctx, recv| {
186205
(*reachable_inner).borrow_mut().extend(recv.take_inner());
187206
});
188207

@@ -207,17 +226,24 @@ fn benchmark_hydroflow(c: &mut Criterion) {
207226
// A dataflow that represents graph reachability.
208227
let mut df = Hydroflow::new();
209228

210-
let (reachable_out, origins_in) = df.make_edge::<VecHandoff<usize>>();
211-
let (did_reach_out, possible_reach_in) = df.make_edge::<VecHandoff<usize>>();
212-
let (output_out, sink_in) = df.make_edge::<VecHandoff<usize>>();
213-
214-
df.add_subgraph_source(reachable_out, move |_ctx, send| {
215-
send.give(Some(1));
216-
});
229+
let (reachable_out, origins_in) =
230+
df.make_edge::<VecHandoff<usize>>("reachable -> origins".into());
231+
let (did_reach_out, possible_reach_in) =
232+
df.make_edge::<VecHandoff<usize>>("did_reach -> possible_reach".into());
233+
let (output_out, sink_in) = df.make_edge::<VecHandoff<usize>>("output -> sink".into());
234+
235+
df.add_subgraph_source(
236+
"initially reachable source".into(),
237+
reachable_out,
238+
move |_ctx, send| {
239+
send.give(Some(1));
240+
},
241+
);
217242

218243
let seen_handle = df.add_state::<RefCell<HashSet<usize>>>(Default::default());
219244

220245
df.add_subgraph(
246+
"main".into(),
221247
tl!(origins_in, possible_reach_in),
222248
tl!(did_reach_out, output_out),
223249
move |context, tl!(origins, did_reach_recv), tl!(did_reach_send, output)| {
@@ -249,7 +275,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
249275

250276
let reachable_verts = Rc::new(RefCell::new(HashSet::new()));
251277
let reachable_inner = reachable_verts.clone();
252-
df.add_subgraph_sink(sink_in, move |_ctx, recv| {
278+
df.add_subgraph_sink("output sink".into(), sink_in, move |_ctx, recv| {
253279
(*reachable_inner).borrow_mut().extend(recv.take_inner());
254280
});
255281

hydroflow/examples/chat/client.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub(crate) async fn run_client(opts: Opts) {
3030
let reader = tokio::io::BufReader::new(tokio::io::stdin());
3131
let lines = tokio_stream::wrappers::LinesStream::new(reader.lines())
3232
.map(|result| Some(result.expect("Failed to read stdin as UTF-8.")));
33-
df.add_input_from_stream::<_, VecHandoff<String>, _>(lines)
33+
df.add_input_from_stream::<_, VecHandoff<String>, _>("stdin".into(), lines)
3434
};
3535

3636
// format addresses
@@ -39,10 +39,11 @@ pub(crate) async fn run_client(opts: Opts) {
3939
let messages_addr = format!("localhost:{}", messages_port);
4040

4141
// set up the flow for requesting to be a member
42+
// TODO(mingwei): use surface API instead of `wrap_input` here.
4243
let (my_info_send, my_info_recv) = df
4344
.hydroflow
44-
.make_edge::<VecHandoff<(String, MemberRequest)>>();
45-
let my_info_set = df.hydroflow.add_input(my_info_send);
45+
.make_edge::<VecHandoff<(String, MemberRequest)>>("my_info".into());
46+
let my_info_set = df.hydroflow.add_input("my_info input".into(), my_info_send);
4647
let my_info_get = df.wrap_input(my_info_recv);
4748
my_info_set.give(Some((
4849
addr,
@@ -52,7 +53,10 @@ pub(crate) async fn run_client(opts: Opts) {
5253
messages_addr,
5354
},
5455
)));
55-
df.add_subgraph(my_info_get.pull_to_push().push_to(connect_req));
56+
df.add_subgraph(
57+
"my_info 2".into(),
58+
my_info_get.pull_to_push().push_to(connect_req),
59+
);
5660

5761
let nickname = opts.name.clone();
5862
let nick2 = nickname.clone();
@@ -73,10 +77,11 @@ pub(crate) async fn run_client(opts: Opts) {
7377
})
7478
.map(Some)
7579
.push_to(messages_send);
76-
df.add_subgraph(sg);
80+
df.add_subgraph("sending messages".into(), sg);
7781

7882
// set up the flow for receiving messages
7983
df.add_subgraph(
84+
"receiving messages".into(),
8085
messages_recv
8186
.flatten()
8287
.filter(move |x| x.nickname != nick2)

hydroflow/examples/chat/server.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ pub(crate) async fn run_server(opts: Opts) {
4141

4242
// 2. feed new members into the join
4343
// But first, we need a buffer to turn push into pull for cross_join.
44-
let (memberships_push, memberships_pull) = hf.make_edge::<VecHandoff<String>, Option<String>>();
44+
let (memberships_push, memberships_pull) =
45+
hf.make_edge::<VecHandoff<String>, Option<String>>("memberships".into());
4546
// and now the other start_tee
4647
let member_join_input = hf
4748
.start_tee()
@@ -54,7 +55,7 @@ pub(crate) async fn run_server(opts: Opts) {
5455
.flatten()
5556
.pull_to_push()
5657
.tee(membership_response, member_join_input);
57-
hf.add_subgraph(sg);
58+
hf.add_subgraph("tee members".into(), sg);
5859

5960
// And assemble the cross-join of msgs_in and members_in, flowing to members_out
6061
let msgs_in = msgs_in.flatten();
@@ -75,7 +76,7 @@ pub(crate) async fn run_server(opts: Opts) {
7576
.pull_to_push()
7677
.push_to(messages_out);
7778

78-
hf.add_subgraph(sg);
79+
hf.add_subgraph("main cross-join".into(), sg);
7980

8081
let mut hf = hf.build();
8182

hydroflow/examples/covid_tracing/main.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,26 @@ fn main() {
2727

2828
let mut df = Hydroflow::new();
2929

30-
let (contacts_send, contacts_recv) = df.make_edge::<VecHandoff<(Pid, Pid, DateTime)>>();
31-
let contacts_send = df.add_channel_input(contacts_send);
30+
let (contacts_send, contacts_recv) =
31+
df.make_edge::<VecHandoff<(Pid, Pid, DateTime)>>("contacts".into());
32+
let contacts_send = df.add_channel_input("contacts input".into(), contacts_send);
33+
3234
let (diagnosed_send, diagnosed_recv) =
33-
df.make_edge::<VecHandoff<(Pid, (DateTime, DateTime))>>();
34-
let diagnosed_send = df.add_channel_input(diagnosed_send);
35-
let (people_send, people_recv) = df.make_edge::<VecHandoff<(Pid, (Name, Phone))>>();
36-
let people_send = df.add_channel_input(people_send);
35+
df.make_edge::<VecHandoff<(Pid, (DateTime, DateTime))>>("diagnosed".into());
36+
let diagnosed_send = df.add_channel_input("diagnosed input".into(), diagnosed_send);
37+
38+
let (people_send, people_recv) =
39+
df.make_edge::<VecHandoff<(Pid, (Name, Phone))>>("people".into());
40+
let people_send = df.add_channel_input("people input".into(), people_send);
3741

38-
let (loop_send, loop_recv) = df.make_edge::<VecHandoff<(Pid, DateTime)>>();
39-
let (notifs_send, notifs_recv) = df.make_edge::<VecHandoff<(Pid, DateTime)>>();
42+
let (loop_send, loop_recv) = df.make_edge::<VecHandoff<(Pid, DateTime)>>("loop".into());
43+
let (notifs_send, notifs_recv) = df.make_edge::<VecHandoff<(Pid, DateTime)>>("notifs".into());
4044

4145
type MyJoinState = RefCell<JoinState<&'static str, (usize, usize), (&'static str, usize)>>;
4246
let state_handle = df.add_state(MyJoinState::default());
4347

4448
df.add_subgraph(
49+
"main".into(),
4550
tl!(contacts_recv, diagnosed_recv, loop_recv),
4651
tl!(notifs_send, loop_send),
4752
move |context,
@@ -90,6 +95,7 @@ fn main() {
9095
let mut people_exposure = Default::default();
9196

9297
df.add_subgraph(
98+
"join people and notifs".into(),
9399
tl!(people_recv, notifs_recv),
94100
tl!(),
95101
move |_ctx, tl!(peoples, exposures), ()| {

0 commit comments

Comments
 (0)