@@ -559,7 +559,21 @@ impl Simulation {
559
559
self . run_data_collection ( event_receiver, & mut tasks) ;
560
560
561
561
// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
562
- let activities = self . activity_executors ( ) . await ?;
562
+ let activities = match self . activity_executors ( ) . await {
563
+ Ok ( a) => a,
564
+ Err ( e) => {
565
+ // If we encounter an error while setting up the activity_executors,
566
+ // we need to shutdown and wait for tasks to finish. We have started background tasks in the
567
+ // run_data_collection function, so we should shut those down before returning.
568
+ self . shutdown ( ) ;
569
+ while let Some ( res) = tasks. join_next ( ) . await {
570
+ if let Err ( e) = res {
571
+ log:: error!( "Task exited with error: {e}." ) ;
572
+ }
573
+ }
574
+ return Err ( e) ;
575
+ } ,
576
+ } ;
563
577
let consumer_channels = self . dispatch_consumers (
564
578
activities
565
579
. iter ( )
@@ -572,8 +586,24 @@ impl Simulation {
572
586
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
573
587
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
574
588
let mut producer_tasks = JoinSet :: new ( ) ;
575
- self . dispatch_producers ( activities, consumer_channels, & mut producer_tasks)
576
- . await ?;
589
+ match self
590
+ . dispatch_producers ( activities, consumer_channels, & mut producer_tasks)
591
+ . await
592
+ {
593
+ Ok ( _) => { } ,
594
+ Err ( e) => {
595
+ // If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
596
+ // We have started background tasks in the run_data_collection function,
597
+ // so we should shut those down before returning.
598
+ self . shutdown ( ) ;
599
+ while let Some ( res) = tasks. join_next ( ) . await {
600
+ if let Err ( e) = res {
601
+ log:: error!( "Task exited with error: {e}." ) ;
602
+ }
603
+ }
604
+ return Err ( e) ;
605
+ } ,
606
+ }
577
607
578
608
// Start a task that waits for the producers to finish.
579
609
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
@@ -1158,7 +1188,7 @@ async fn produce_simulation_results(
1158
1188
tokio:: select! {
1159
1189
biased;
1160
1190
_ = listener. clone( ) => {
1161
- return Ok ( ( ) )
1191
+ break ;
1162
1192
} ,
1163
1193
output = output_receiver. recv( ) => {
1164
1194
match output {
@@ -1184,21 +1214,17 @@ async fn produce_simulation_results(
1184
1214
} ,
1185
1215
None => return Ok ( ( ) )
1186
1216
}
1187
- } ,
1188
- track_payment = set. join_next( ) => {
1189
- if let Some ( res) = track_payment {
1190
- match res {
1191
- Ok ( track_payment_res) => {
1192
- track_payment_res?
1193
- } ,
1194
- Err ( _) => {
1195
- return Err ( SimulationError :: TaskError ) ;
1196
- } ,
1197
- }
1198
- }
1199
1217
}
1200
1218
}
1201
1219
}
1220
+
1221
+ log:: debug!( "Simulation results producer exiting." ) ;
1222
+ while let Some ( res) = set. join_next ( ) . await {
1223
+ if let Err ( e) = res {
1224
+ log:: error!( "Simulation results producer task exited with error: {e}." ) ;
1225
+ }
1226
+ }
1227
+ Ok ( ( ) )
1202
1228
}
1203
1229
1204
1230
async fn track_payment_result (
0 commit comments