@@ -50,7 +50,7 @@ impl NodeId {
50
50
crate :: NodeId :: PublicKey ( pk) => {
51
51
if pk != node_id {
52
52
return Err ( LightningError :: ValidationError ( format ! (
53
- "the provided node id does not match the one returned by the backend ({} != {})." ,
53
+ "The provided node id does not match the one returned by the backend ({} != {})." ,
54
54
pk, node_id
55
55
) ) ) ;
56
56
}
@@ -139,6 +139,12 @@ pub struct ActivityParser {
139
139
/// The destination of the payment.
140
140
#[ serde( with = "serializers::serde_node_id" ) ]
141
141
pub destination : NodeId ,
142
+ /// The time in the simulation to start the payment.
143
+ #[ serde( default ) ]
144
+ pub start_secs : u16 ,
145
+ /// The number of payments to send over the course of the simulation.
146
+ #[ serde( default ) ]
147
+ pub count : Option < u64 > ,
142
148
/// The interval of the event, as in every how many seconds the payment is performed.
143
149
pub interval_secs : u16 ,
144
150
/// The amount of m_sat to used in this payment.
@@ -153,6 +159,10 @@ pub struct ActivityDefinition {
153
159
pub source : NodeInfo ,
154
160
/// The destination of the payment.
155
161
pub destination : NodeInfo ,
162
+ /// The time in the simulation to start the payment.
163
+ pub start_secs : u16 ,
164
+ /// The number of payments to send over the course of the simulation.
165
+ pub count : Option < u64 > ,
156
166
/// The interval of the event, as in every how many seconds the payment is performed.
157
167
pub interval_secs : u16 ,
158
168
/// The amount of m_sat to used in this payment.
@@ -261,6 +271,12 @@ pub trait DestinationGenerator: Send {
261
271
pub struct PaymentGenerationError ( String ) ;
262
272
263
273
pub trait PaymentGenerator : Display + Send {
274
+ /// Returns the time that the payments should start
275
+ fn payment_start ( & self ) -> Duration ;
276
+
277
+ /// Returns the number of payments that should be made
278
+ fn payment_count ( & self ) -> Option < u64 > ;
279
+
264
280
/// Returns the number of seconds that a node should wait until firing its next payment.
265
281
fn next_payment_wait ( & self ) -> time:: Duration ;
266
282
@@ -554,9 +570,25 @@ impl Simulation {
554
570
) ;
555
571
556
572
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
557
- self . dispatch_producers ( activities, consumer_channels, & mut tasks)
573
+ // The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
574
+ let mut producer_tasks = JoinSet :: new ( ) ;
575
+ self . dispatch_producers ( activities, consumer_channels, & mut producer_tasks)
558
576
. await ?;
559
577
578
+ // Start a task that waits for the producers to finish.
579
+ // If all producers finish, then there is nothing left to do and the simulation can be shutdown.
580
+ let producer_trigger = self . shutdown_trigger . clone ( ) ;
581
+ tasks. spawn ( async move {
582
+ while let Some ( res) = producer_tasks. join_next ( ) . await {
583
+ if let Err ( e) = res {
584
+ log:: error!( "Producer exited with error: {e}." ) ;
585
+ }
586
+ }
587
+ log:: info!( "All producers finished. Shutting down." ) ;
588
+ producer_trigger. trigger ( )
589
+ } ) ;
590
+
591
+ // Start a task that will shutdown the simulation if the total_time is met.
560
592
if let Some ( total_time) = self . total_time {
561
593
let t = self . shutdown_trigger . clone ( ) ;
562
594
let l = self . shutdown_listener . clone ( ) ;
@@ -639,7 +671,7 @@ impl Simulation {
639
671
// csr: consume simulation results
640
672
let csr_write_results = self . write_results . clone ( ) ;
641
673
tasks. spawn ( async move {
642
- log:: debug!( "Staring simulation results consumer." ) ;
674
+ log:: debug!( "Starting simulation results consumer." ) ;
643
675
if let Err ( e) = consume_simulation_results (
644
676
result_logger,
645
677
results_receiver,
@@ -667,6 +699,8 @@ impl Simulation {
667
699
for description in self . activity . iter ( ) {
668
700
let activity_generator = DefinedPaymentActivity :: new (
669
701
description. destination . clone ( ) ,
702
+ Duration :: from_secs ( description. start_secs . into ( ) ) ,
703
+ description. count ,
670
704
Duration :: from_secs ( description. interval_secs . into ( ) ) ,
671
705
description. amount_msat ,
672
706
) ;
@@ -777,9 +811,9 @@ impl Simulation {
777
811
consume_events ( ce_node, receiver, ce_output_sender, ce_listener) . await
778
812
{
779
813
ce_shutdown. trigger ( ) ;
780
- log:: error!( "Event consumer exited with error: {e:?}." ) ;
814
+ log:: error!( "Event consumer for node {node_info} exited with error: {e:?}." ) ;
781
815
} else {
782
- log:: debug!( "Event consumer for node {node_info} received shutdown signal ." ) ;
816
+ log:: debug!( "Event consumer for node {node_info} completed successfully ." ) ;
783
817
}
784
818
} ) ;
785
819
}
@@ -826,9 +860,9 @@ impl Simulation {
826
860
. await
827
861
{
828
862
pe_shutdown. trigger ( ) ;
829
- log:: debug!( "Event producer exited with error {e}." ) ;
863
+ log:: debug!( "Activity producer for {source} exited with error {e}." ) ;
830
864
} else {
831
- log:: debug!( "Random activity generator for {source} received shutdown signal ." ) ;
865
+ log:: debug!( "Activity producer for {source} completed successfully ." ) ;
832
866
}
833
867
} ) ;
834
868
}
@@ -918,9 +952,33 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
918
952
sender : Sender < SimulationEvent > ,
919
953
listener : Listener ,
920
954
) -> Result < ( ) , SimulationError > {
955
+ let mut current_count = 0 ;
921
956
loop {
922
- let wait = node_generator. next_payment_wait ( ) ;
923
- log:: debug!( "Next payment for {source} in {:?} seconds." , wait) ;
957
+ if let Some ( c) = node_generator. payment_count ( ) {
958
+ if c == current_count {
959
+ log:: info!(
960
+ "Payment count has been met for {source}: {c} payments. Stopping the activity."
961
+ ) ;
962
+ return Ok ( ( ) ) ;
963
+ }
964
+ }
965
+
966
+ let wait: Duration = if current_count == 0 {
967
+ let start = node_generator. payment_start ( ) ;
968
+ if start != Duration :: from_secs ( 0 ) {
969
+ log:: debug!(
970
+ "Using a start delay. The first payment for {source} will be at {:?}." ,
971
+ start
972
+ ) ;
973
+ }
974
+ start
975
+ } else {
976
+ log:: debug!(
977
+ "Next payment for {source} in {:?}." ,
978
+ node_generator. next_payment_wait( )
979
+ ) ;
980
+ node_generator. next_payment_wait ( )
981
+ } ;
924
982
925
983
select ! {
926
984
biased;
@@ -948,14 +1006,15 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
948
1006
} ,
949
1007
} ;
950
1008
951
- log:: debug!( "Generated random payment: {source} -> {}: {amount} msat." , destination) ;
1009
+ log:: debug!( "Generated payment: {source} -> {}: {amount} msat." , destination) ;
952
1010
953
1011
// Send the payment, exiting if we can no longer send to the consumer.
954
1012
let event = SimulationEvent :: SendPayment ( destination. clone( ) , amount) ;
955
1013
if sender. send( event. clone( ) ) . await . is_err( ) {
956
- return Err ( SimulationError :: MpscChannelError ( format!( "Stopped random producer for {amount}: {source} -> {destination}." ) ) ) ;
1014
+ return Err ( SimulationError :: MpscChannelError ( format!( "Stopped activity producer for {amount}: {source} -> {destination}." ) ) ) ;
957
1015
}
958
1016
1017
+ current_count += 1 ;
959
1018
} ,
960
1019
}
961
1020
}
0 commit comments