Skip to content

Commit 01c31ec

Browse files
committed
sim-lib/refactor: use single dispatch for random and define activities
1 parent 7510bbc commit 01c31ec

File tree

1 file changed

+106
-107
lines changed

1 file changed

+106
-107
lines changed

sim-lib/src/lib.rs

Lines changed: 106 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ pub trait LightningNode {
203203
async fn list_channels(&mut self) -> Result<Vec<u64>, LightningError>;
204204
}
205205

206-
pub trait DestinationGenerator {
206+
pub trait DestinationGenerator: Send {
207207
// choose_destination picks a destination node within the network, returning the node's information and its
208208
// capacity (if available).
209209
fn choose_destination(&self, source: PublicKey) -> (NodeInfo, Option<u64>);
@@ -212,7 +212,8 @@ pub trait DestinationGenerator {
212212
#[derive(Debug, Error)]
213213
#[error("Payment generation error: {0}")]
214214
pub struct PaymentGenerationError(String);
215-
pub trait PaymentGenerator {
215+
216+
pub trait PaymentGenerator: Display + Send {
216217
// Returns the number of seconds that a node should wait until firing its next payment.
217218
fn next_payment_wait(&self) -> time::Duration;
218219

@@ -348,6 +349,16 @@ pub struct WriteResults {
348349
/// The number of activity results to batch before printing in CSV.
349350
pub batch_size: u32,
350351
}
352+
///
353+
/// ExecutorKit contains the components required to spin up an activity configured by the user, to be used to
354+
/// spin up the appropriate producers and consumers for the activity.
355+
struct ExecutorKit {
356+
source_info: NodeInfo,
357+
/// We use an arc mutex here because some implementations of the trait will be very expensive to clone.
358+
/// See [NetworkGraphView] for details.
359+
network_generator: Arc<Mutex<dyn DestinationGenerator>>,
360+
payment_generator: Box<dyn PaymentGenerator>,
361+
}
351362

352363
impl Simulation {
353364
pub fn new(
@@ -472,39 +483,20 @@ impl Simulation {
472483
let (event_sender, event_receiver) = channel(1);
473484
self.run_data_collection(event_receiver, &mut tasks);
474485

475-
// Create consumers for every source node when dealing with activity descriptions, or only for nodes with
476-
// sufficient capacity if generating random activity. Since we have to query the capacity of every node
477-
// in our network for picking random activity nodes, we cache this value here to be used later when we spin
478-
// up producers.
479-
let mut random_activity_nodes = HashMap::new();
480-
let collecting_nodes = if !self.activity.is_empty() {
481-
self.activity
486+
// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
487+
let activities = self.activity_executors().await?;
488+
let consumer_channels = self.dispatch_consumers(
489+
activities
482490
.iter()
483-
.map(|activity| activity.source.pubkey)
484-
.collect()
485-
} else {
486-
random_activity_nodes.extend(self.random_activity_nodes().await?);
487-
random_activity_nodes.keys().cloned().collect()
488-
};
491+
.map(|generator| generator.source_info.pubkey)
492+
.collect(),
493+
event_sender.clone(),
494+
&mut tasks,
495+
);
489496

490-
let consumer_channels =
491-
self.dispatch_consumers(collecting_nodes, event_sender.clone(), &mut tasks);
492-
493-
// Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that
494-
// has been configured (if any), passing in the channel that is used to notify data collection that events have
495-
// been generated. Alternatively, we'll generate random activity if there is no activity specified.
496-
if !self.activity.is_empty() {
497-
self.dispatch_activity_producers(consumer_channels, &mut tasks)
498-
.await;
499-
} else {
500-
log::info!(
501-
"Generating random activity with multiplier: {}, average payment amount: {}.",
502-
self.activity_multiplier,
503-
self.expected_payment_msat
504-
);
505-
self.dispatch_random_producers(random_activity_nodes, consumer_channels, &mut tasks)
506-
.await?;
507-
}
497+
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
498+
self.dispatch_producers(activities, consumer_channels, &mut tasks)
499+
.await?;
508500

509501
if let Some(total_time) = self.total_time {
510502
let t = self.shutdown_trigger.clone();
@@ -576,14 +568,45 @@ impl Simulation {
576568
log::debug!("Simulator data collection set up.");
577569
}
578570

571+
async fn activity_executors(&self) -> Result<Vec<ExecutorKit>, SimulationError> {
572+
let mut generators = Vec::new();
573+
574+
// Note: when we allow configuring both defined and random activity, this will no longer be an if/else, we'll
575+
// just populate with each type as configured.
576+
if !self.activity.is_empty() {
577+
for description in self.activity.iter() {
578+
let activity_generator = DefinedPaymentActivity::new(
579+
description.destination.clone(),
580+
Duration::from_secs(description.interval_secs.into()),
581+
description.amount_msat,
582+
);
583+
584+
generators.push(ExecutorKit {
585+
source_info: description.source.clone(),
586+
// Defined activities have very simple generators, so the traits required are implemented on
587+
// a single struct which we just cheaply clone.
588+
network_generator: Arc::new(Mutex::new(activity_generator.clone())),
589+
payment_generator: Box::new(activity_generator),
590+
});
591+
}
592+
} else {
593+
generators = self.random_activity_nodes().await?;
594+
}
595+
596+
Ok(generators)
597+
}
598+
579599
/// Returns the list of nodes that are eligible for generating random activity on. This is the subset of nodes
580600
/// that have sufficient capacity to generate payments of our expected payment amount.
581-
async fn random_activity_nodes(
582-
&self,
583-
) -> Result<HashMap<PublicKey, (NodeInfo, u64)>, SimulationError> {
601+
async fn random_activity_nodes(&self) -> Result<Vec<ExecutorKit>, SimulationError> {
584602
// Collect capacity of each node from its view of its own channels. Total capacity is divided by two to
585603
// avoid double counting capacity (as each node has a counterparty in the channel).
586-
let mut node_capacities = HashMap::new();
604+
let mut generators = Vec::new();
605+
let mut active_nodes = HashMap::new();
606+
607+
// Do a first pass to get the capacity of each node which we need to be able to create a network generator.
608+
// While we're at it, we get the node info and store it with capacity to create activity generators in our
609+
// second pass.
587610
for (pk, node) in self.nodes.iter() {
588611
let chan_capacity = node.lock().await.list_channels().await?.iter().sum::<u64>();
589612

@@ -594,16 +617,39 @@ impl Simulation {
594617
continue;
595618
}
596619

597-
node_capacities.insert(
598-
*pk,
599-
(
600-
node.lock().await.get_node_info(pk).await?,
601-
chan_capacity / 2,
620+
// Don't double count channel capacity because each channel reports the total balance between counter
621+
// parities. Track capacity separately to be used for our network generator.
622+
let capacity = chan_capacity / 2;
623+
let node_info = node.lock().await.get_node_info(pk).await?;
624+
active_nodes.insert(node_info.pubkey, (node_info, capacity));
625+
}
626+
627+
let network_generator = Arc::new(Mutex::new(
628+
NetworkGraphView::new(active_nodes.values().cloned().collect())
629+
.map_err(SimulationError::RandomActivityError)?,
630+
));
631+
632+
log::info!(
633+
"Created network generator: {}.",
634+
network_generator.lock().await
635+
);
636+
637+
for (node_info, capacity) in active_nodes.values() {
638+
generators.push(ExecutorKit {
639+
source_info: node_info.clone(),
640+
network_generator: network_generator.clone(),
641+
payment_generator: Box::new(
642+
RandomPaymentActivity::new(
643+
*capacity,
644+
self.expected_payment_msat,
645+
self.activity_multiplier,
646+
)
647+
.map_err(SimulationError::RandomActivityError)?,
602648
),
603-
);
649+
});
604650
}
605651

606-
Ok(node_capacities)
652+
Ok(generators)
607653
}
608654

609655
/// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation
@@ -640,73 +686,26 @@ impl Simulation {
640686
channels
641687
}
642688

643-
/// Responsible for spinning up producers for a set of activity descriptions.
644-
async fn dispatch_activity_producers(
689+
/// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present
690+
/// for every source node in the set of executors.
691+
async fn dispatch_producers(
645692
&self,
693+
executors: Vec<ExecutorKit>,
646694
producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
647695
tasks: &mut JoinSet<()>,
648-
) {
649-
for description in self.activity.iter() {
650-
let sender_chan = producer_channels.get(&description.source.pubkey).unwrap();
651-
let generator = DefinedPaymentActivity::new(
652-
description.destination.clone(),
653-
Duration::from_secs(description.interval_secs.into()),
654-
description.amount_msat,
655-
);
656-
657-
tasks.spawn(produce_events(
658-
description.source.clone(),
659-
Arc::new(Mutex::new(generator.clone())),
660-
generator,
661-
sender_chan.clone(),
662-
self.shutdown_trigger.clone(),
663-
self.shutdown_listener.clone(),
664-
));
665-
}
666-
}
667-
668-
/// Responsible for spinning up producers for a set of activity descriptions. Requires that node capacities are
669-
/// provided for each node represented in producer channels.
670-
async fn dispatch_random_producers(
671-
&self,
672-
node_capacities: HashMap<PublicKey, (NodeInfo, u64)>,
673-
consumer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
674-
tasks: &mut JoinSet<()>,
675696
) -> Result<(), SimulationError> {
676-
let network_generator = Arc::new(Mutex::new(
677-
NetworkGraphView::new(node_capacities.values().cloned().collect())
678-
.map_err(SimulationError::RandomActivityError)?,
679-
));
680-
681-
log::info!(
682-
"Created network generator: {}.",
683-
network_generator.lock().await
684-
);
685-
686-
for (pk, sender) in consumer_channels.into_iter() {
687-
let (info, source_capacity) = match node_capacities.get(&pk) {
688-
Some((info, capacity)) => (info.clone(), *capacity),
689-
None => {
690-
return Err(SimulationError::RandomActivityError(
691-
RandomActivityError::ValueError(format!(
692-
"Random activity generator run for: {} with unknown capacity.",
693-
pk
694-
)),
695-
));
696-
}
697-
};
698-
699-
let node_generator = RandomPaymentActivity::new(
700-
source_capacity,
701-
self.expected_payment_msat,
702-
self.activity_multiplier,
703-
)
704-
.map_err(SimulationError::RandomActivityError)?;
697+
for executor in executors {
698+
let sender = producer_channels.get(&executor.source_info.pubkey).ok_or(
699+
SimulationError::RandomActivityError(RandomActivityError::ValueError(format!(
700+
"Activity producer for: {} not found.",
701+
executor.source_info.pubkey,
702+
))),
703+
)?;
705704

706705
tasks.spawn(produce_events(
707-
info,
708-
network_generator.clone(),
709-
node_generator,
706+
executor.source_info,
707+
executor.network_generator,
708+
executor.payment_generator,
710709
sender.clone(),
711710
self.shutdown_trigger.clone(),
712711
self.shutdown_listener.clone(),
@@ -790,10 +789,10 @@ async fn consume_events(
790789

791790
// produce events generates events for the activity description provided. It accepts a shutdown listener so it can
792791
// exit if other threads signal that they have errored out.
793-
async fn produce_events<N: DestinationGenerator, A: PaymentGenerator + Display>(
792+
async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator + ?Sized>(
794793
source: NodeInfo,
795794
network_generator: Arc<Mutex<N>>,
796-
node_generator: A,
795+
node_generator: Box<A>,
797796
sender: Sender<SimulationEvent>,
798797
shutdown: Trigger,
799798
listener: Listener,

0 commit comments

Comments
 (0)