Skip to content

Commit c409c3f

Browse files
committed
Use Name: Into<Cow<'static, str>> generic to avoid .into()s
1 parent 0feeb66 commit c409c3f

File tree

21 files changed

+273
-293
lines changed

21 files changed

+273
-293
lines changed

benches/benches/fork_join.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,21 @@ fn benchmark_hydroflow(c: &mut Criterion) {
1515
b.iter(|| {
1616
let mut df = Hydroflow::new();
1717

18-
let (start_send, start_recv) = df.make_edge::<VecHandoff<usize>>("start".into());
18+
let (start_send, start_recv) = df.make_edge::<_, VecHandoff<usize>>("start");
1919

2020
let mut sent = false;
21-
df.add_subgraph_source("source".into(), start_send, move |_ctx, send| {
21+
df.add_subgraph_source("source", start_send, move |_ctx, send| {
2222
if !sent {
2323
sent = true;
2424
send.give(Iter(0..NUM_INTS));
2525
}
2626
});
2727

28-
let (send1, mut recv1) = df.make_edge::<VecHandoff<_>>("1".into());
29-
let (send2, mut recv2) = df.make_edge::<VecHandoff<_>>("2".into());
28+
let (send1, mut recv1) = df.make_edge::<_, VecHandoff<_>>("1");
29+
let (send2, mut recv2) = df.make_edge::<_, VecHandoff<_>>("2");
3030

3131
df.add_subgraph_in_2out(
32-
"fork".into(),
32+
"fork",
3333
start_recv,
3434
send1,
3535
send2,
@@ -45,11 +45,11 @@ fn benchmark_hydroflow(c: &mut Criterion) {
4545
);
4646

4747
for _ in 0..NUM_OPS {
48-
let (send1, next_recv1) = df.make_edge("1".into());
49-
let (send2, next_recv2) = df.make_edge("2".into());
48+
let (send1, next_recv1) = df.make_edge("1");
49+
let (send2, next_recv2) = df.make_edge("2");
5050

5151
df.add_subgraph_2in_2out(
52-
"join-fork".into(),
52+
"join-fork",
5353
recv1,
5454
recv2,
5555
send1,
@@ -73,7 +73,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
7373
recv2 = next_recv2;
7474
}
7575

76-
df.add_subgraph_2sink("join (merge)".into(), recv1, recv2, |_ctx, recv1, recv2| {
76+
df.add_subgraph_2sink("join (merge)", recv1, recv2, |_ctx, recv1, recv2| {
7777
for x in recv1.take_inner() {
7878
black_box(x);
7979
}

benches/benches/identity.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -154,31 +154,26 @@ fn benchmark_hydroflow(c: &mut Criterion) {
154154
b.iter(|| {
155155
let mut df = Hydroflow::new();
156156

157-
let (next_send, mut next_recv) = df.make_edge::<VecHandoff<usize>>("end".into());
157+
let (next_send, mut next_recv) = df.make_edge::<_, VecHandoff<usize>>("end");
158158

159159
let mut sent = false;
160-
df.add_subgraph_source("source".into(), next_send, move |_ctx, send| {
160+
df.add_subgraph_source("source", next_send, move |_ctx, send| {
161161
if !sent {
162162
sent = true;
163163
send.give(Iter(0..NUM_INTS));
164164
}
165165
});
166166
for _ in 0..NUM_OPS {
167-
let (next_send, next_next_recv) = df.make_edge("handoff".into());
167+
let (next_send, next_next_recv) = df.make_edge("handoff");
168168

169-
df.add_subgraph_in_out(
170-
"identity".into(),
171-
next_recv,
172-
next_send,
173-
|_ctx, recv, send| {
174-
send.give(Iter(recv.take_inner().into_iter()));
175-
},
176-
);
169+
df.add_subgraph_in_out("identity", next_recv, next_send, |_ctx, recv, send| {
170+
send.give(Iter(recv.take_inner().into_iter()));
171+
});
177172

178173
next_recv = next_next_recv;
179174
}
180175

181-
df.add_subgraph_sink("sink".into(), next_recv, |_ctx, recv| {
176+
df.add_subgraph_sink("sink", next_recv, |_ctx, recv| {
182177
for x in recv.take_inner() {
183178
black_box(x);
184179
}

benches/benches/reachability.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,15 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
130130
let mut df = Hydroflow::new();
131131

132132
type Hoff = VecHandoff<usize>;
133-
let (reachable_out, merge_lhs) =
134-
df.make_edge::<Hoff>("reachable_out -> merge_lhs".into());
135-
let (neighbors_out, merge_rhs) =
136-
df.make_edge::<Hoff>("neighbors_out -> merge_rhs".into());
137-
let (merge_out, distinct_in) = df.make_edge::<Hoff>("merge_out -> distinct_in".into());
138-
let (distinct_out, tee_in) = df.make_edge::<Hoff>("distinct_out -> tee_in".into());
139-
let (tee_out1, neighbors_in) = df.make_edge::<Hoff>("tee_out1 -> neighbors_in".into());
140-
let (tee_out2, sink_in) = df.make_edge::<Hoff>("tee_out2 -> sink_in".into());
133+
let (reachable_out, merge_lhs) = df.make_edge::<_, Hoff>("reachable_out -> merge_lhs");
134+
let (neighbors_out, merge_rhs) = df.make_edge::<_, Hoff>("neighbors_out -> merge_rhs");
135+
let (merge_out, distinct_in) = df.make_edge::<_, Hoff>("merge_out -> distinct_in");
136+
let (distinct_out, tee_in) = df.make_edge::<_, Hoff>("distinct_out -> tee_in");
137+
let (tee_out1, neighbors_in) = df.make_edge::<_, Hoff>("tee_out1 -> neighbors_in");
138+
let (tee_out2, sink_in) = df.make_edge::<_, Hoff>("tee_out2 -> sink_in");
141139

142140
df.add_subgraph_source(
143-
"initially reachable source".into(),
141+
"initially reachable source",
144142
reachable_out,
145143
move |_ctx, send| {
146144
send.give(Some(1));
@@ -149,7 +147,7 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
149147

150148
let seen_handle = df.add_state::<RefCell<HashSet<usize>>>(Default::default());
151149
df.add_subgraph(
152-
"distinct".into(),
150+
"distinct",
153151
tl!(distinct_in),
154152
tl!(distinct_out),
155153
move |context, tl!(recv), tl!(send)| {
@@ -163,7 +161,7 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
163161
);
164162

165163
df.add_subgraph_2in_out(
166-
"merge".into(),
164+
"merge",
167165
merge_lhs,
168166
merge_rhs,
169167
merge_out,
@@ -174,7 +172,7 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
174172
);
175173

176174
df.add_subgraph_in_out(
177-
"get neighbors".into(),
175+
"get neighbors",
178176
neighbors_in,
179177
neighbors_out,
180178
move |_ctx, recv, send| {
@@ -187,7 +185,7 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
187185
);
188186

189187
df.add_subgraph_in_2out(
190-
"tee".into(),
188+
"tee",
191189
tee_in,
192190
tee_out1,
193191
tee_out2,
@@ -201,7 +199,7 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
201199

202200
let reachable_verts = Rc::new(RefCell::new(HashSet::new()));
203201
let reachable_inner = reachable_verts.clone();
204-
df.add_subgraph_sink("output sink".into(), sink_in, move |_ctx, recv| {
202+
df.add_subgraph_sink("output sink", sink_in, move |_ctx, recv| {
205203
(*reachable_inner).borrow_mut().extend(recv.take_inner());
206204
});
207205

@@ -227,13 +225,13 @@ fn benchmark_hydroflow(c: &mut Criterion) {
227225
let mut df = Hydroflow::new();
228226

229227
let (reachable_out, origins_in) =
230-
df.make_edge::<VecHandoff<usize>>("reachable -> origins".into());
228+
df.make_edge::<_, VecHandoff<usize>>("reachable -> origins");
231229
let (did_reach_out, possible_reach_in) =
232-
df.make_edge::<VecHandoff<usize>>("did_reach -> possible_reach".into());
233-
let (output_out, sink_in) = df.make_edge::<VecHandoff<usize>>("output -> sink".into());
230+
df.make_edge::<_, VecHandoff<usize>>("did_reach -> possible_reach");
231+
let (output_out, sink_in) = df.make_edge::<_, VecHandoff<usize>>("output -> sink");
234232

235233
df.add_subgraph_source(
236-
"initially reachable source".into(),
234+
"initially reachable source",
237235
reachable_out,
238236
move |_ctx, send| {
239237
send.give(Some(1));
@@ -243,7 +241,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
243241
let seen_handle = df.add_state::<RefCell<HashSet<usize>>>(Default::default());
244242

245243
df.add_subgraph(
246-
"main".into(),
244+
"main",
247245
tl!(origins_in, possible_reach_in),
248246
tl!(did_reach_out, output_out),
249247
move |context, tl!(origins, did_reach_recv), tl!(did_reach_send, output)| {
@@ -275,7 +273,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
275273

276274
let reachable_verts = Rc::new(RefCell::new(HashSet::new()));
277275
let reachable_inner = reachable_verts.clone();
278-
df.add_subgraph_sink("output sink".into(), sink_in, move |_ctx, recv| {
276+
df.add_subgraph_sink("output sink", sink_in, move |_ctx, recv| {
279277
(*reachable_inner).borrow_mut().extend(recv.take_inner());
280278
});
281279

hydroflow/examples/chat/client.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub(crate) async fn run_client(opts: Opts) {
3030
let reader = tokio::io::BufReader::new(tokio::io::stdin());
3131
let lines = tokio_stream::wrappers::LinesStream::new(reader.lines())
3232
.map(|result| Some(result.expect("Failed to read stdin as UTF-8.")));
33-
df.add_input_from_stream::<_, VecHandoff<String>, _>("stdin".into(), lines)
33+
df.add_input_from_stream::<_, _, VecHandoff<String>, _>("stdin", lines)
3434
};
3535

3636
// format addresses
@@ -42,8 +42,8 @@ pub(crate) async fn run_client(opts: Opts) {
4242
// TODO(mingwei): use surface API instead of `wrap_input` here.
4343
let (my_info_send, my_info_recv) = df
4444
.hydroflow
45-
.make_edge::<VecHandoff<(String, MemberRequest)>>("my_info".into());
46-
let my_info_set = df.hydroflow.add_input("my_info input".into(), my_info_send);
45+
.make_edge::<_, VecHandoff<(String, MemberRequest)>>("my_info");
46+
let my_info_set = df.hydroflow.add_input("my_info input", my_info_send);
4747
let my_info_get = df.wrap_input(my_info_recv);
4848
my_info_set.give(Some((
4949
addr,
@@ -53,10 +53,7 @@ pub(crate) async fn run_client(opts: Opts) {
5353
messages_addr,
5454
},
5555
)));
56-
df.add_subgraph(
57-
"my_info 2".into(),
58-
my_info_get.pull_to_push().push_to(connect_req),
59-
);
56+
df.add_subgraph("my_info 2", my_info_get.pull_to_push().push_to(connect_req));
6057

6158
let nickname = opts.name.clone();
6259
let nick2 = nickname.clone();
@@ -77,11 +74,11 @@ pub(crate) async fn run_client(opts: Opts) {
7774
})
7875
.map(Some)
7976
.push_to(messages_send);
80-
df.add_subgraph("sending messages".into(), sg);
77+
df.add_subgraph("sending messages", sg);
8178

8279
// set up the flow for receiving messages
8380
df.add_subgraph(
84-
"receiving messages".into(),
81+
"receiving messages",
8582
messages_recv
8683
.flatten()
8784
.filter(move |x| x.nickname != nick2)

hydroflow/examples/chat/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub(crate) async fn run_server(opts: Opts) {
4242
// 2. feed new members into the join
4343
// But first, we need a buffer to turn push into pull for cross_join.
4444
let (memberships_push, memberships_pull) =
45-
hf.make_edge::<VecHandoff<String>, Option<String>>("memberships".into());
45+
hf.make_edge::<_, VecHandoff<String>, Option<String>>("memberships");
4646
// and now the other start_tee
4747
let member_join_input = hf
4848
.start_tee()
@@ -55,7 +55,7 @@ pub(crate) async fn run_server(opts: Opts) {
5555
.flatten()
5656
.pull_to_push()
5757
.tee(membership_response, member_join_input);
58-
hf.add_subgraph("tee members".into(), sg);
58+
hf.add_subgraph("tee members", sg);
5959

6060
// And assemble the cross-join of msgs_in and members_in, flowing to members_out
6161
let msgs_in = msgs_in.flatten();
@@ -76,7 +76,7 @@ pub(crate) async fn run_server(opts: Opts) {
7676
.pull_to_push()
7777
.push_to(messages_out);
7878

79-
hf.add_subgraph("main cross-join".into(), sg);
79+
hf.add_subgraph("main cross-join", sg);
8080

8181
let mut hf = hf.build();
8282

hydroflow/examples/covid_tracing/main.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,24 @@ fn main() {
2828
let mut df = Hydroflow::new();
2929

3030
let (contacts_send, contacts_recv) =
31-
df.make_edge::<VecHandoff<(Pid, Pid, DateTime)>>("contacts".into());
32-
let contacts_send = df.add_channel_input("contacts input".into(), contacts_send);
31+
df.make_edge::<_, VecHandoff<(Pid, Pid, DateTime)>>("contacts");
32+
let contacts_send = df.add_channel_input("contacts input", contacts_send);
3333

3434
let (diagnosed_send, diagnosed_recv) =
35-
df.make_edge::<VecHandoff<(Pid, (DateTime, DateTime))>>("diagnosed".into());
36-
let diagnosed_send = df.add_channel_input("diagnosed input".into(), diagnosed_send);
35+
df.make_edge::<_, VecHandoff<(Pid, (DateTime, DateTime))>>("diagnosed");
36+
let diagnosed_send = df.add_channel_input("diagnosed input", diagnosed_send);
3737

38-
let (people_send, people_recv) =
39-
df.make_edge::<VecHandoff<(Pid, (Name, Phone))>>("people".into());
40-
let people_send = df.add_channel_input("people input".into(), people_send);
38+
let (people_send, people_recv) = df.make_edge::<_, VecHandoff<(Pid, (Name, Phone))>>("people");
39+
let people_send = df.add_channel_input("people input", people_send);
4140

42-
let (loop_send, loop_recv) = df.make_edge::<VecHandoff<(Pid, DateTime)>>("loop".into());
43-
let (notifs_send, notifs_recv) = df.make_edge::<VecHandoff<(Pid, DateTime)>>("notifs".into());
41+
let (loop_send, loop_recv) = df.make_edge::<_, VecHandoff<(Pid, DateTime)>>("loop");
42+
let (notifs_send, notifs_recv) = df.make_edge::<_, VecHandoff<(Pid, DateTime)>>("notifs");
4443

4544
type MyJoinState = RefCell<JoinState<&'static str, (usize, usize), (&'static str, usize)>>;
4645
let state_handle = df.add_state(MyJoinState::default());
4746

4847
df.add_subgraph(
49-
"main".into(),
48+
"main",
5049
tl!(contacts_recv, diagnosed_recv, loop_recv),
5150
tl!(notifs_send, loop_send),
5251
move |context,
@@ -95,7 +94,7 @@ fn main() {
9594
let mut people_exposure = Default::default();
9695

9796
df.add_subgraph(
98-
"join people and notifs".into(),
97+
"join people and notifs",
9998
tl!(people_recv, notifs_recv),
10099
tl!(),
101100
move |_ctx, tl!(peoples, exposures), ()| {

hydroflow/examples/covid_tracing_dist/database.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,21 @@ pub(crate) async fn run_database(opts: Opts) {
1717

1818
let mut df = Hydroflow::new();
1919

20-
let (notifs, notif_sink) = df.make_edge::<VecHandoff<(String, usize)>>("notifs".into());
20+
let (notifs, notif_sink) = df.make_edge::<_, VecHandoff<(String, usize)>>("notifs");
2121
let (encode_contacts_out, contacts_merge) =
22-
df.make_edge::<VecHandoff<Message>>("encoded contacts".into());
22+
df.make_edge::<_, VecHandoff<Message>>("encoded contacts");
2323
let (encode_diagnoses_out, diagnoses_merge) =
24-
df.make_edge::<VecHandoff<Message>>("encoded diagnoses".into());
24+
df.make_edge::<_, VecHandoff<Message>>("encoded diagnoses");
2525

2626
let (contacts_send, contacts_recv) =
27-
df.make_edge::<VecHandoff<(&'static str, &'static str, usize)>>("contacts".into());
28-
let contacts_send = df.add_channel_input("contacts input".into(), contacts_send);
27+
df.make_edge::<_, VecHandoff<(&'static str, &'static str, usize)>>("contacts");
28+
let contacts_send = df.add_channel_input("contacts input", contacts_send);
2929
let (diagnosed_send, diagnosed_recv) =
30-
df.make_edge::<VecHandoff<(&'static str, (usize, usize))>>("diagnosed".into());
31-
let diagnosed_send = df.add_channel_input("diagnosed input".into(), diagnosed_send);
30+
df.make_edge::<_, VecHandoff<(&'static str, (usize, usize))>>("diagnosed");
31+
let diagnosed_send = df.add_channel_input("diagnosed input", diagnosed_send);
3232
let (people_send, people_recv) =
33-
df.make_edge::<VecHandoff<(String, (String, String))>>("people".into());
34-
let people_send = df.add_channel_input("people input".into(), people_send);
33+
df.make_edge::<_, VecHandoff<(String, (String, String))>>("people");
34+
let people_send = df.add_channel_input("people input", people_send);
3535

3636
let stream = TcpListener::bind(format!("localhost:{}", opts.port))
3737
.await
@@ -41,7 +41,7 @@ pub(crate) async fn run_database(opts: Opts) {
4141
let (network_in, network_out) = df.add_tcp_stream(stream);
4242

4343
df.add_subgraph_in_out(
44-
"decode messages".into(),
44+
"decode messages",
4545
network_out,
4646
notifs,
4747
|_ctx, recv, send| {
@@ -89,7 +89,7 @@ pub(crate) async fn run_database(opts: Opts) {
8989
});
9090

9191
df.add_subgraph_2in_out(
92-
"merge contacts and diagnoses".into(),
92+
"merge contacts and diagnoses",
9393
contacts_merge,
9494
diagnoses_merge,
9595
network_in,
@@ -101,7 +101,7 @@ pub(crate) async fn run_database(opts: Opts) {
101101
);
102102

103103
df.add_subgraph_in_out(
104-
"encode contacts".into(),
104+
"encode contacts",
105105
contacts_recv,
106106
encode_contacts_out,
107107
|_ctx, recv, send| {
@@ -115,7 +115,7 @@ pub(crate) async fn run_database(opts: Opts) {
115115
);
116116

117117
df.add_subgraph_in_out(
118-
"encode diagnoses".into(),
118+
"encode diagnoses",
119119
diagnosed_recv,
120120
encode_diagnoses_out,
121121
|_ctx, recv, send| {
@@ -130,7 +130,7 @@ pub(crate) async fn run_database(opts: Opts) {
130130

131131
let mut join_state = Default::default();
132132
df.add_subgraph(
133-
"join people and notifs".into(),
133+
"join people and notifs",
134134
tl!(notif_sink, people_recv),
135135
tl!(),
136136
move |_ctx, tl!(notifs, people), tl!()| {

0 commit comments

Comments
 (0)