@@ -139,10 +139,10 @@ pub struct ActivityParser {
139
139
/// The destination of the payment.
140
140
#[ serde( with = "serializers::serde_node_id" ) ]
141
141
pub destination : NodeId ,
142
- /// The time in the simulation to start the payment
142
+ /// The time in the simulation to start the payment.
143
143
#[ serde( default ) ]
144
144
pub start_secs : u16 ,
145
- /// The number of payments to send over the course of the simulation
145
+ /// The number of payments to send over the course of the simulation.
146
146
#[ serde( default ) ]
147
147
pub count : Option < u64 > ,
148
148
/// The interval of the event, as in every how many seconds the payment is performed.
@@ -159,9 +159,9 @@ pub struct ActivityDefinition {
159
159
pub source : NodeInfo ,
160
160
/// The destination of the payment.
161
161
pub destination : NodeInfo ,
162
- /// The time in the simulation to start the payment
162
+ /// The time in the simulation to start the payment.
163
163
pub start_secs : u16 ,
164
- /// The number of payments to send over the course of the simulation
164
+ /// The number of payments to send over the course of the simulation.
165
165
pub count : Option < u64 > ,
166
166
/// The interval of the event, as in every how many seconds the payment is performed.
167
167
pub interval_secs : u16 ,
@@ -570,9 +570,25 @@ impl Simulation {
570
570
) ;
571
571
572
572
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
573
- self . dispatch_producers ( activities, consumer_channels, & mut tasks)
573
+ // The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
574
+ let mut producer_tasks = JoinSet :: new ( ) ;
575
+ self . dispatch_producers ( activities, consumer_channels, & mut producer_tasks)
574
576
. await ?;
575
577
578
+ // Start a task that waits for the producers to finish.
579
+ // If all producers finish, then there is nothing left to do and the simulation can be shutdown.
580
+ let producer_trigger = self . shutdown_trigger . clone ( ) ;
581
+ tasks. spawn ( async move {
582
+ while let Some ( res) = producer_tasks. join_next ( ) . await {
583
+ if let Err ( e) = res {
584
+ log:: error!( "Producer exited with error: {e}." ) ;
585
+ }
586
+ }
587
+ log:: info!( "All producers finished. Shutting down." ) ;
588
+ producer_trigger. trigger ( )
589
+ } ) ;
590
+
591
+ // Start a task that will shutdown the simulation if the total_time is met.
576
592
if let Some ( total_time) = self . total_time {
577
593
let t = self . shutdown_trigger . clone ( ) ;
578
594
let l = self . shutdown_listener . clone ( ) ;
@@ -948,6 +964,10 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
948
964
}
949
965
950
966
let wait: Duration = if current_count == 0 {
967
+ log:: debug!(
968
+ "Using a start delay. The first payment for {source} will be at {:?} seconds." ,
969
+ node_generator. payment_start( )
970
+ ) ;
951
971
node_generator. payment_start ( )
952
972
} else {
953
973
node_generator. next_payment_wait ( )
0 commit comments