Skip to content

Commit 5838d4a

Browse files
authored
Merge pull request #183 from bjohnson5/182-high-cpu-fix
Issue #182 Fixes a problematic loop in produce_simulation_results
2 parents a574b1e + 489e497 commit 5838d4a

File tree

1 file changed

+47
-20
lines changed

1 file changed

+47
-20
lines changed

sim-lib/src/lib.rs

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,21 @@ impl Simulation {
559559
self.run_data_collection(event_receiver, &mut tasks);
560560

561561
// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
562-
let activities = self.activity_executors().await?;
562+
let activities = match self.activity_executors().await {
563+
Ok(a) => a,
564+
Err(e) => {
565+
// If we encounter an error while setting up the activity_executors,
566+
// we need to shutdown and wait for tasks to finish. We have started background tasks in the
567+
// run_data_collection function, so we should shut those down before returning.
568+
self.shutdown();
569+
while let Some(res) = tasks.join_next().await {
570+
if let Err(e) = res {
571+
log::error!("Task exited with error: {e}.");
572+
}
573+
}
574+
return Err(e);
575+
},
576+
};
563577
let consumer_channels = self.dispatch_consumers(
564578
activities
565579
.iter()
@@ -572,8 +586,24 @@ impl Simulation {
572586
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
573587
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
574588
let mut producer_tasks = JoinSet::new();
575-
self.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
576-
.await?;
589+
match self
590+
.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
591+
.await
592+
{
593+
Ok(_) => {},
594+
Err(e) => {
595+
// If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
596+
// We have started background tasks in the run_data_collection function,
597+
// so we should shut those down before returning.
598+
self.shutdown();
599+
while let Some(res) = tasks.join_next().await {
600+
if let Err(e) = res {
601+
log::error!("Task exited with error: {e}.");
602+
}
603+
}
604+
return Err(e);
605+
},
606+
}
577607

578608
// Start a task that waits for the producers to finish.
579609
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
@@ -1154,11 +1184,11 @@ async fn produce_simulation_results(
11541184
) -> Result<(), SimulationError> {
11551185
let mut set = tokio::task::JoinSet::new();
11561186

1157-
loop {
1187+
let result = loop {
11581188
tokio::select! {
11591189
biased;
11601190
_ = listener.clone() => {
1161-
return Ok(())
1191+
break Ok(())
11621192
},
11631193
output = output_receiver.recv() => {
11641194
match output {
@@ -1170,35 +1200,32 @@ async fn produce_simulation_results(
11701200
source_node.clone(), results.clone(), payment, listener.clone()
11711201
));
11721202
} else {
1173-
return Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source)));
1203+
break Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source)));
11741204
}
11751205
},
11761206
SimulationOutput::SendPaymentFailure(payment, result) => {
11771207
if results.send((payment, result.clone())).await.is_err() {
1178-
return Err(SimulationError::MpscChannelError(
1208+
break Err(SimulationError::MpscChannelError(
11791209
format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time),
11801210
));
11811211
}
11821212
}
11831213
};
11841214
},
1185-
None => return Ok(())
1186-
}
1187-
},
1188-
track_payment = set.join_next() => {
1189-
if let Some(res) = track_payment {
1190-
match res {
1191-
Ok(track_payment_res) => {
1192-
track_payment_res?
1193-
},
1194-
Err(_) => {
1195-
return Err(SimulationError::TaskError);
1196-
},
1197-
}
1215+
None => break Ok(())
11981216
}
11991217
}
12001218
}
1219+
};
1220+
1221+
log::debug!("Simulation results producer exiting.");
1222+
while let Some(res) = set.join_next().await {
1223+
if let Err(e) = res {
1224+
log::error!("Simulation results producer task exited with error: {e}.");
1225+
}
12011226
}
1227+
1228+
result
12021229
}
12031230

12041231
async fn track_payment_result(

0 commit comments

Comments
 (0)