Skip to content

Commit b22abe4

Browse files
committed
sim-lib: select on sending channel and shutdown listener
1 parent 3b1bb18 commit b22abe4

File tree

2 files changed

+48
-17
lines changed

2 files changed

+48
-17
lines changed

docs/ARCHITECTURE.md

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,20 @@ simulation, we relay on the following:
6363
and a `Listener` that propagates this signal.
6464
2. The (`Trigger`, `Listener`) pair are used with channels: if a channel
6565
errors out across `send()` or `recv()`, shutdown is triggered. There is
66-
no reliance on channel mechanics, i.e. errors generated when all senders
67-
are and/or a receiver is dropped.
66+
no reliance on channel mechanics, i.e. that receiving channels will error
67+
out when all of their sending channels are dropped.
6868
3. All events are handled in a `tokio::select` to allow waiting on
6969
multiple asynchronous tasks at once. These selects should be `biased`
7070
on the exit case (ie, the `Listener` being triggered) so that we
7171
prioritize exit above generating more events.
7272
4. Additionally, we `select!` on shutdown signal on `send()`/`recv()`
73-
for all channels to guarantee this:
74-
- A task's receiver exiting while one or more corresponding senders
75-
(in different tasks) are actively sending, doesn't result in the
76-
sending tasks erroring due to channel `SendError`. Any sender's
77-
inability to `send()` due to a dropped receiver triggers a clean
78-
shutdown across all listening tasks.
73+
for all channels to guarantee shutdown:
74+
- When the signal to shutdown is received, it is possible that a
75+
task responsible for consuming on the `Receiver` channel exits when
76+
multiple tasks are still attempting to send to it.
77+
- By using `select` with all `send()` instructions, we ensure that
78+
the senders will exit cleanly, rather than block waiting on a
79+
receiver that has already exited to consume its send.
80+
- An alternative to this approach would be to use `receiver.close()`
81+
and drain all items from the channel (resulting in unblocking the
82+
senders).

sim-lib/src/lib.rs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,8 +1053,17 @@ async fn consume_events(
10531053
}
10541054
};
10551055

1056-
if sender.send(outcome.clone()).await.is_err() {
1057-
return Err(SimulationError::MpscChannelError(format!("Error sending simulation output {outcome:?}.")));
1056+
select!{
1057+
biased;
1058+
_ = listener.clone() => {
1059+
return Ok(())
1060+
}
1061+
send_result = sender.send(outcome.clone()) => {
1062+
if send_result.is_err() {
1063+
return Err(SimulationError::MpscChannelError(
1064+
format!("Error sending simulation output {outcome:?}.")));
1065+
}
1066+
}
10581067
}
10591068
}
10601069
}
@@ -1118,8 +1127,17 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
11181127

11191128
// Send the payment, exiting if we can no longer send to the consumer.
11201129
let event = SimulationEvent::SendPayment(destination.clone(), amount);
1121-
if sender.send(event.clone()).await.is_err() {
1122-
return Err(SimulationError::MpscChannelError (format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
1130+
select!{
1131+
biased;
1132+
_ = listener.clone() => {
1133+
return Ok(());
1134+
},
1135+
send_result = sender.send(event.clone()) => {
1136+
if send_result.is_err(){
1137+
return Err(SimulationError::MpscChannelError(
1138+
format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
1139+
}
1140+
},
11231141
}
11241142

11251143
current_count += 1;
@@ -1314,10 +1332,18 @@ async fn produce_simulation_results(
13141332
}
13151333
},
13161334
SimulationOutput::SendPaymentFailure(payment, result) => {
1317-
if results.send((payment, result.clone())).await.is_err() {
1318-
break Err(SimulationError::MpscChannelError(
1319-
format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time),
1320-
));
1335+
select!{
1336+
_ = listener.clone() => {
1337+
return Ok(());
1338+
},
1339+
send_result = results.send((payment, result.clone())) => {
1340+
if send_result.is_err(){
1341+
break Err(SimulationError::MpscChannelError(
1342+
format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.",
1343+
payment.hash, payment.dispatch_time),
1344+
));
1345+
}
1346+
},
13211347
}
13221348
}
13231349
};
@@ -1384,7 +1410,8 @@ async fn track_payment_result(
13841410
},
13851411
send_payment_result = results.send((payment, res.clone())) => {
13861412
if send_payment_result.is_err() {
1387-
return Err(SimulationError::MpscChannelError(format!("Failed to send payment result {res} for payment {payment}.")))
1413+
return Err(SimulationError::MpscChannelError(
1414+
format!("Failed to send payment result {res} for payment {payment}.")))
13881415
}
13891416
}
13901417
}

0 commit comments

Comments
 (0)