From 5ee13fb88e0de8ead4de6925e3a7b0cf2bd3babf Mon Sep 17 00:00:00 2001 From: Anyitechs Date: Thu, 20 Mar 2025 16:26:54 +0100 Subject: [PATCH 1/2] feat(background-processor): add BackgroundProcessorBuilder for optional components --- lightning-background-processor/src/lib.rs | 524 +++++++++++++++++++--- 1 file changed, 464 insertions(+), 60 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index dda54362e35..fc305870167 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -52,6 +52,8 @@ use lightning_liquidity::ALiquidityManager; use core::ops::Deref; use core::time::Duration; +#[cfg(feature = "std")] +use std::marker::PhantomData; #[cfg(feature = "std")] use core::sync::atomic::{AtomicBool, Ordering}; @@ -931,6 +933,10 @@ impl BackgroundProcessor { /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling /// either [`join`] or [`stop`]. /// + /// This method takes a [`BackgroundProcessorConfig`] object that contains all necessary components for + /// background processing. To build this configuration, you can use the [`BackgroundProcessorConfigBuilder`] + /// which provides a convenient builder pattern for setting up both required and optional components. + /// /// # Data Persistence /// /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or @@ -948,7 +954,7 @@ impl BackgroundProcessor { /// /// # Event Handling /// - /// `event_handler` is responsible for handling events that users should be notified of (e.g., + /// The `event_handler` in the configuration is responsible for handling events that users should be notified of (e.g., /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common /// functionality implemented by other handlers. /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures. @@ -999,9 +1005,32 @@ impl BackgroundProcessor { K: 'static + Deref, OS: 'static + Deref> + Send, >( - persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, - onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, - liquidity_manager: Option, sweeper: Option, logger: L, scorer: Option, + config: BackgroundProcessorConfig< + 'a, + UL, + CF, + T, + F, + G, + L, + P, + EH, + PS, + ES, + M, + CM, + OM, + PGS, + RGS, + PM, + LM, + S, + SC, + D, + O, + K, + OS, + >, ) -> Self where UL::Target: 'static + UtxoLookup, @@ -1022,6 +1051,19 @@ impl BackgroundProcessor { { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); + + let persister = config.persister; + let event_handler = config.event_handler; + let chain_monitor = config.chain_monitor; + let channel_manager = config.channel_manager; + let onion_messenger = config.onion_messenger; + let gossip_sync = config.gossip_sync; + let peer_manager = config.peer_manager; + let liquidity_manager = config.liquidity_manager; + let sweeper = config.sweeper; + let logger = config.logger; + let scorer = config.scorer; + let handle = thread::spawn(move || -> Result<(), std::io::Error> { let event_handler = |event| { let network_graph = gossip_sync.network_graph(); @@ -1144,6 +1186,304 @@ impl BackgroundProcessor { } } +/// Configuration for synchronous [`BackgroundProcessor`] +/// event processing. +/// +/// This configuration holds all components needed for background processing, +/// including required and optional components. +/// +/// The configuration can be constructed using [`BackgroundProcessorConfigBuilder`], which provides +/// a convenient builder pattern for setting up both required and optional components. +#[cfg(feature = "std")] +pub struct BackgroundProcessorConfig< + 'a, + UL: 'static + Deref, + CF: 'static + Deref, + T: 'static + Deref, + F: 'static + Deref + Send, + G: 'static + Deref>, + L: 'static + Deref + Send, + P: 'static + Deref, + EH: 'static + EventHandler + Send, + PS: 'static + Deref + Send, + ES: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P, ES>> + + Send + + Sync, + CM: 'static + Deref + Send, + OM: 'static + Deref + Send, + PGS: 'static + Deref> + Send, + RGS: 'static + Deref> + Send, + PM: 'static + Deref + Send, + LM: 'static + Deref + Send, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + D: 'static + Deref, + O: 'static + Deref, + K: 'static + Deref, + OS: 'static + Deref> + Send, +> where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + ES::Target: 'static + EntropySource, + CM::Target: AChannelManager, + OM::Target: AOnionMessenger, + PM::Target: APeerManager, + LM::Target: ALiquidityManager, + D::Target: ChangeDestinationSourceSync, + O::Target: 'static + OutputSpender, + K::Target: 'static + KVStore, +{ + persister: PS, + event_handler: EH, + chain_monitor: M, + channel_manager: CM, + onion_messenger: Option, + gossip_sync: GossipSync, + peer_manager: PM, + liquidity_manager: Option, + sweeper: Option, + logger: L, + scorer: Option, + _phantom: PhantomData<(&'a (), CF, T, F, P)>, +} + +/// A builder for constructing a [`BackgroundProcessorConfig`] with optional components. +/// +/// This builder provides a flexible and type-safe way to construct a [`BackgroundProcessorConfig`] +/// with optional and required components. It helps avoid specifying +/// concrete types for components that aren't being used. +#[cfg(feature = "std")] +pub struct BackgroundProcessorConfigBuilder< + 'a, + UL: 'static + Deref, + CF: 'static + Deref, + T: 'static + Deref, + F: 'static + Deref + Send, + G: 'static + Deref>, + L: 'static + Deref + Send, + P: 'static + Deref, + EH: 'static + EventHandler + Send, + PS: 'static + Deref + Send, + ES: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P, ES>> + + Send + + Sync, + CM: 'static + Deref + Send, + OM: 'static + Deref + Send, + PGS: 'static + Deref> + Send, + RGS: 'static + Deref> + Send, + PM: 'static + Deref + Send, + LM: 'static + Deref + Send, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + D: 'static + Deref, + O: 'static + Deref, + K: 'static + Deref, + OS: 'static + Deref> + Send, +> where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + ES::Target: 'static + EntropySource, + CM::Target: AChannelManager, + OM::Target: AOnionMessenger, + PM::Target: APeerManager, + LM::Target: ALiquidityManager, + D::Target: ChangeDestinationSourceSync, + O::Target: 'static + OutputSpender, + K::Target: 'static + KVStore, +{ + persister: PS, + event_handler: EH, + chain_monitor: M, + channel_manager: CM, + onion_messenger: Option, + gossip_sync: GossipSync, + peer_manager: PM, + liquidity_manager: Option, + sweeper: Option, + logger: L, + scorer: Option, + _phantom: PhantomData<(&'a (), CF, T, F, P)>, +} + +#[cfg(feature = "std")] +impl< + 'a, + UL: 'static + Deref, + CF: 'static + Deref, + T: 'static + Deref, + F: 'static + Deref + Send, + G: 'static + Deref>, + L: 'static + Deref + Send, + P: 'static + Deref, + EH: 'static + EventHandler + Send, + PS: 'static + Deref + Send, + ES: 'static + Deref + Send, + M: 'static + + Deref< + Target = ChainMonitor<::Signer, CF, T, F, L, P, ES>, + > + + Send + + Sync, + CM: 'static + Deref + Send, + OM: 'static + Deref + Send, + PGS: 'static + Deref> + Send, + RGS: 'static + Deref> + Send, + PM: 'static + Deref + Send, + LM: 'static + Deref + Send, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + D: 'static + Deref, + O: 'static + Deref, + K: 'static + Deref, + OS: 'static + Deref> + Send, + > + BackgroundProcessorConfigBuilder< + 'a, + UL, + CF, + T, + F, + G, + L, + P, + EH, + PS, + ES, + M, + CM, + OM, + PGS, + RGS, + PM, + LM, + S, + SC, + D, + O, + K, + OS, + > where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + ES::Target: 'static + EntropySource, + CM::Target: AChannelManager, + OM::Target: AOnionMessenger, + PM::Target: APeerManager, + LM::Target: ALiquidityManager, + D::Target: ChangeDestinationSourceSync, + O::Target: 'static + OutputSpender, + K::Target: 'static + KVStore, +{ + /// Creates a new builder instance. + pub fn new( + persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + gossip_sync: GossipSync, peer_manager: PM, logger: L, + ) -> Self { + Self { + persister, + event_handler, + chain_monitor, + channel_manager, + onion_messenger: None, + gossip_sync, + peer_manager, + liquidity_manager: None, + sweeper: None, + logger, + scorer: None, + _phantom: PhantomData, + } + } + + /// Sets the optional onion messenger component. + pub fn with_onion_messenger(&mut self, onion_messenger: OM) -> &mut Self { + self.onion_messenger = Some(onion_messenger); + self + } + + /// Sets the optional scorer component. + pub fn with_scorer(&mut self, scorer: S) -> &mut Self { + self.scorer = Some(scorer); + self + } + + /// Sets the optional liquidity manager component. + pub fn with_liquidity_manager(&mut self, liquidity_manager: LM) -> &mut Self { + self.liquidity_manager = Some(liquidity_manager); + self + } + + /// Sets the optional sweeper component. + pub fn with_sweeper(&mut self, sweeper: OS) -> &mut Self { + self.sweeper = Some(sweeper); + self + } + + /// Builds and returns a [`BackgroundProcessorConfig`] object. + pub fn build( + self, + ) -> BackgroundProcessorConfig< + 'a, + UL, + CF, + T, + F, + G, + L, + P, + EH, + PS, + ES, + M, + CM, + OM, + PGS, + RGS, + PM, + LM, + S, + SC, + D, + O, + K, + OS, + > { + BackgroundProcessorConfig { + persister: self.persister, + event_handler: self.event_handler, + chain_monitor: self.chain_monitor, + channel_manager: self.channel_manager, + onion_messenger: self.onion_messenger, + gossip_sync: self.gossip_sync, + peer_manager: self.peer_manager, + liquidity_manager: self.liquidity_manager, + sweeper: self.sweeper, + logger: self.logger, + scorer: self.scorer, + _phantom: PhantomData, + } + } +} + #[cfg(feature = "std")] impl Drop for BackgroundProcessor { fn drop(&mut self) { @@ -1153,7 +1493,9 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { - use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER}; + use super::{ + BackgroundProcessor, BackgroundProcessorConfigBuilder, GossipSync, FRESHNESS_TIMER, + }; use bitcoin::constants::{genesis_block, ChainHash}; use bitcoin::hashes::Hash; use bitcoin::locktime::absolute::LockTime; @@ -1938,19 +2280,24 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| Ok(()); - let bg_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + let bg_processor = BackgroundProcessor::start(config); macro_rules! check_persisted_data { ($node: expr, $filepath: expr) => { @@ -2033,19 +2380,25 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| Ok(()); - let bg_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let bg_processor = BackgroundProcessor::start(config); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); @@ -2077,19 +2430,24 @@ mod tests { Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"), ); let event_handler = |_: _| Ok(()); - let bg_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + let bg_processor = BackgroundProcessor::start(config); match bg_processor.join() { Ok(_) => panic!("Expected error persisting manager"), Err(e) => { @@ -2148,19 +2506,25 @@ mod tests { let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| Ok(()); - let bg_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let bg_processor = BackgroundProcessor::start(config); match bg_processor.stop() { Ok(_) => panic!("Expected error persisting network graph"), @@ -2179,19 +2543,25 @@ mod tests { let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| Ok(()); - let bg_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let bg_processor = BackgroundProcessor::start(config); match bg_processor.stop() { Ok(_) => panic!("Expected error persisting scorer"), @@ -2227,19 +2597,24 @@ mod tests { Ok(()) }; - let bg_processor = BackgroundProcessor::start( + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let bg_processor = BackgroundProcessor::start(config); // Open a channel and check that the FundingGenerationReady event was handled. begin_open_channel!(nodes[0], nodes[1], channel_value); @@ -2291,19 +2666,25 @@ mod tests { Ok(()) }; let persister = Arc::new(Persister::new(data_dir)); - let bg_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let bg_processor = BackgroundProcessor::start(config); // Force close the channel and check that the SpendableOutputs event was handled. let error_message = "Channel force-closed"; @@ -2456,19 +2837,24 @@ mod tests { Ok(()) }; - let bg_processor = BackgroundProcessor::start( + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let bg_processor = BackgroundProcessor::start(config); begin_open_channel!(nodes[0], nodes[1], channel_value); assert_eq!( @@ -2487,19 +2873,25 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| Ok(()); - let bg_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let bg_processor = BackgroundProcessor::start(config); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); @@ -2584,19 +2976,25 @@ mod tests { let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); let event_handler = |_: _| Ok(()); - let background_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let background_processor = BackgroundProcessor::start(config); do_test_not_pruning_network_graph_until_graph_sync_completion!( nodes, @@ -2781,19 +3179,25 @@ mod tests { let (_, nodes) = create_nodes(1, "test_payment_path_scoring"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let bg_processor = BackgroundProcessor::start( + + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)) + .with_sweeper(nodes[0].sweeper.clone()); + + let config = builder.build(); + + let bg_processor = BackgroundProcessor::start(config); do_test_payment_path_scoring!( nodes, From 9528df600e0e7854554dda158854dc58c942727b Mon Sep 17 00:00:00 2001 From: Anyitechs Date: Thu, 5 Jun 2025 00:32:00 +0100 Subject: [PATCH 2/2] introduce `BackgroundProcessorConfigAsyncBuilder` for `process_events_async` --- lightning-background-processor/src/lib.rs | 451 ++++++++++++++++++++-- 1 file changed, 420 insertions(+), 31 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index fc305870167..a5bf59fff91 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -50,10 +50,9 @@ use lightning_rapid_gossip_sync::RapidGossipSync; use lightning_liquidity::ALiquidityManager; +use core::marker::PhantomData; use core::ops::Deref; use core::time::Duration; -#[cfg(feature = "std")] -use std::marker::PhantomData; #[cfg(feature = "std")] use core::sync::atomic::{AtomicBool, Ordering}; @@ -649,7 +648,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::time::SystemTime; -/// # use lightning_background_processor::{process_events_async, GossipSync}; +/// # use lightning_background_processor::{process_events_async, BackgroundProcessorConfigAsyncBuilder, GossipSync}; /// # struct Logger {} /// # impl lightning::util::logger::Logger for Logger { /// # fn log(&self, _record: lightning::util::logger::Record) {} @@ -746,24 +745,32 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();" )] #[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")] -/// process_events_async( +/// // Create a configuration builder with required components +/// let mut builder = BackgroundProcessorConfigAsyncBuilder::new( /// background_persister, /// |e| background_event_handler.handle_event(e), /// background_chain_mon, /// background_chan_man, -/// Some(background_onion_messenger), /// background_gossip_sync, /// background_peer_man, -/// Some(background_liquidity_manager), -/// Some(background_sweeper), /// background_logger, -/// Some(background_scorer), /// sleeper, /// mobile_interruptable_platform, /// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()) -/// ) -/// .await -/// .expect("Failed to process events"); +/// ); +/// +/// // Add optional components +/// builder +/// .with_onion_messenger(background_onion_messenger) +/// .with_liquidity_manager(background_liquidity_manager) +/// .with_sweeper(background_sweeper) +/// .with_scorer(background_scorer); +/// +/// // Build the config and start processing events +/// let config = builder.build(); +/// process_events_async(config) +/// .await +/// .expect("Failed to process events"); /// }); /// /// // Stop the background processing. @@ -804,10 +811,36 @@ pub async fn process_events_async< Sleeper: Fn(Duration) -> SleepFuture, FetchTime: Fn() -> Option, >( - persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, - onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, - liquidity_manager: Option, sweeper: Option, logger: L, scorer: Option, - sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, + config: BackgroundProcessorConfigAsync< + 'a, + UL, + CF, + T, + F, + G, + L, + P, + EventHandlerFuture, + EventHandler, + PS, + ES, + M, + CM, + OM, + PGS, + RGS, + PM, + LM, + D, + O, + K, + OS, + S, + SC, + SleepFuture, + Sleeper, + FetchTime, + >, ) -> Result<(), lightning::io::Error> where UL::Target: 'static + UtxoLookup, @@ -826,6 +859,21 @@ where D::Target: 'static + ChangeDestinationSource, K::Target: 'static + KVStore, { + let persister = config.persister; + let event_handler = config.event_handler; + let chain_monitor = config.chain_monitor; + let channel_manager = config.channel_manager; + let onion_messenger = config.onion_messenger; + let gossip_sync = config.gossip_sync; + let peer_manager = config.peer_manager; + let liquidity_manager = config.liquidity_manager; + let sweeper = config.sweeper; + let logger = config.logger; + let scorer = config.scorer; + let sleeper = config.sleeper; + let mobile_interruptable_platform = config.mobile_interruptable_platform; + let fetch_time = config.fetch_time; + let mut should_break = false; let async_event_handler = |event| { let network_graph = gossip_sync.network_graph(); @@ -924,6 +972,330 @@ where ) } +/// Configuration for asynchronous event processing. +/// +/// This configuration holds all components needed for asynchronous background processing, +/// including required components and optional components. +pub struct BackgroundProcessorConfigAsync< + 'a, + UL: 'static + Deref, + CF: 'static + Deref, + T: 'static + Deref, + F: 'static + Deref, + G: 'static + Deref>, + L: 'static + Deref, + P: 'static + Deref, + EventHandlerFuture: core::future::Future>, + EventHandler: Fn(Event) -> EventHandlerFuture, + PS: 'static + Deref + Send, + ES: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P, ES>> + + Send + + Sync, + CM: 'static + Deref, + OM: 'static + Deref, + PGS: 'static + Deref>, + RGS: 'static + Deref>, + PM: 'static + Deref, + LM: 'static + Deref, + D: 'static + Deref, + O: 'static + Deref, + K: 'static + Deref, + OS: 'static + Deref>, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + SleepFuture: core::future::Future + core::marker::Unpin, + Sleeper: Fn(Duration) -> SleepFuture, + FetchTime: Fn() -> Option, +> where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + ES::Target: 'static + EntropySource, + CM::Target: AChannelManager, + OM::Target: AOnionMessenger, + PM::Target: APeerManager, + LM::Target: ALiquidityManager, + O::Target: 'static + OutputSpender, + D::Target: 'static + ChangeDestinationSource, + K::Target: 'static + KVStore, +{ + persister: PS, + event_handler: EventHandler, + chain_monitor: M, + channel_manager: CM, + onion_messenger: Option, + gossip_sync: GossipSync, + peer_manager: PM, + liquidity_manager: Option, + sweeper: Option, + logger: L, + scorer: Option, + sleeper: Sleeper, + mobile_interruptable_platform: bool, + fetch_time: FetchTime, + _phantom: PhantomData<&'a ()>, +} + +/// Builder for creating asynchronous background processor configurations. +/// +/// This builder provides a flexible and type-safe way to construct a [`BackgroundProcessorConfigAsync`] +/// object with both required and optional components. It follows the builder +/// pattern and helps avoid specifying concrete types for each component. +pub struct BackgroundProcessorConfigAsyncBuilder< + 'a, + UL: 'static + Deref, + CF: 'static + Deref, + T: 'static + Deref, + F: 'static + Deref, + G: 'static + Deref>, + L: 'static + Deref, + P: 'static + Deref, + EventHandlerFuture: core::future::Future>, + EventHandler: Fn(Event) -> EventHandlerFuture, + PS: 'static + Deref + Send, + ES: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P, ES>> + + Send + + Sync, + CM: 'static + Deref, + OM: 'static + Deref, + PGS: 'static + Deref>, + RGS: 'static + Deref>, + PM: 'static + Deref, + LM: 'static + Deref, + D: 'static + Deref, + O: 'static + Deref, + K: 'static + Deref, + OS: 'static + Deref>, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + SleepFuture: core::future::Future + core::marker::Unpin, + Sleeper: Fn(Duration) -> SleepFuture, + FetchTime: Fn() -> Option, +> where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + ES::Target: 'static + EntropySource, + CM::Target: AChannelManager, + OM::Target: AOnionMessenger, + PM::Target: APeerManager, + LM::Target: ALiquidityManager, + O::Target: 'static + OutputSpender, + D::Target: 'static + ChangeDestinationSource, + K::Target: 'static + KVStore, +{ + persister: PS, + event_handler: EventHandler, + chain_monitor: M, + channel_manager: CM, + onion_messenger: Option, + gossip_sync: GossipSync, + peer_manager: PM, + liquidity_manager: Option, + sweeper: Option, + logger: L, + scorer: Option, + sleeper: Sleeper, + mobile_interruptable_platform: bool, + fetch_time: FetchTime, + _phantom: PhantomData<&'a ()>, +} + +impl< + 'a, + UL: 'static + Deref, + CF: 'static + Deref, + T: 'static + Deref, + F: 'static + Deref, + G: 'static + Deref>, + L: 'static + Deref, + P: 'static + Deref, + EventHandlerFuture: core::future::Future>, + EventHandler: Fn(Event) -> EventHandlerFuture, + PS: 'static + Deref + Send, + ES: 'static + Deref + Send, + M: 'static + + Deref< + Target = ChainMonitor<::Signer, CF, T, F, L, P, ES>, + > + + Send + + Sync, + CM: 'static + Deref, + OM: 'static + Deref, + PGS: 'static + Deref>, + RGS: 'static + Deref>, + PM: 'static + Deref, + LM: 'static + Deref, + D: 'static + Deref, + O: 'static + Deref, + K: 'static + Deref, + OS: 'static + Deref>, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + SleepFuture: core::future::Future + core::marker::Unpin, + Sleeper: Fn(Duration) -> SleepFuture, + FetchTime: Fn() -> Option, + > + BackgroundProcessorConfigAsyncBuilder< + 'a, + UL, + CF, + T, + F, + G, + L, + P, + EventHandlerFuture, + EventHandler, + PS, + ES, + M, + CM, + OM, + PGS, + RGS, + PM, + LM, + D, + O, + K, + OS, + S, + SC, + SleepFuture, + Sleeper, + FetchTime, + > where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + ES::Target: 'static + EntropySource, + CM::Target: AChannelManager, + OM::Target: AOnionMessenger, + PM::Target: APeerManager, + LM::Target: ALiquidityManager, + O::Target: 'static + OutputSpender, + D::Target: 'static + ChangeDestinationSource, + K::Target: 'static + KVStore, +{ + /// Creates a new [`BackgroundProcessorConfigAsyncBuilder`] with the given components. + pub fn new( + persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, + gossip_sync: GossipSync, peer_manager: PM, logger: L, sleeper: Sleeper, + mobile_interruptable_platform: bool, fetch_time: FetchTime, + ) -> Self { + Self { + persister, + event_handler, + chain_monitor, + channel_manager, + onion_messenger: None, + gossip_sync, + peer_manager, + liquidity_manager: None, + sweeper: None, + logger, + scorer: None, + sleeper, + mobile_interruptable_platform, + fetch_time, + _phantom: PhantomData, + } + } + + /// Sets the optional onion messenger component. + pub fn with_onion_messenger(&mut self, onion_messenger: OM) -> &mut Self { + self.onion_messenger = Some(onion_messenger); + self + } + + /// Sets the optional liquidity manager component + pub fn with_liquidity_manager(&mut self, liquidity_manager: LM) -> &mut Self { + self.liquidity_manager = Some(liquidity_manager); + self + } + + /// Sets the optional sweeper component + pub fn with_sweeper(&mut self, sweeper: OS) -> &mut Self { + self.sweeper = Some(sweeper); + self + } + + /// Sets the optional scorer component, + pub fn with_scorer(&mut self, scorer: S) -> &mut Self { + self.scorer = Some(scorer); + self + } + + /// Builds and returns a [`BackgroundProcessorConfigAsync`] object. + pub fn build( + self, + ) -> BackgroundProcessorConfigAsync< + 'a, + UL, + CF, + T, + F, + G, + L, + P, + EventHandlerFuture, + EventHandler, + PS, + ES, + M, + CM, + OM, + PGS, + RGS, + PM, + LM, + D, + O, + K, + OS, + S, + SC, + SleepFuture, + Sleeper, + FetchTime, + > { + BackgroundProcessorConfigAsync { + persister: self.persister, + event_handler: self.event_handler, + chain_monitor: self.chain_monitor, + channel_manager: self.channel_manager, + onion_messenger: self.onion_messenger, + gossip_sync: self.gossip_sync, + peer_manager: self.peer_manager, + liquidity_manager: self.liquidity_manager, + sweeper: self.sweeper, + logger: self.logger, + scorer: self.scorer, + sleeper: self.sleeper, + mobile_interruptable_platform: self.mobile_interruptable_platform, + fetch_time: self.fetch_time, + _phantom: PhantomData, + } + } +} + #[cfg(feature = "std")] impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level @@ -1494,7 +1866,8 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { use super::{ - BackgroundProcessor, BackgroundProcessorConfigBuilder, GossipSync, FRESHNESS_TIMER, + BackgroundProcessor, BackgroundProcessorConfigAsyncBuilder, + BackgroundProcessorConfigBuilder, GossipSync, FRESHNESS_TIMER, }; use bitcoin::constants::{genesis_block, ChainHash}; use bitcoin::hashes::Hash; @@ -2468,18 +2841,14 @@ mod tests { Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"), ); - let bp_future = super::process_events_async( + let mut builder = BackgroundProcessorConfigAsyncBuilder::new( persister, |_: _| async { Ok(()) }, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.sweeper_async()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), move |dur: Duration| { Box::pin(async move { tokio::time::sleep(dur).await; @@ -2489,6 +2858,15 @@ mod tests { false, || Some(Duration::ZERO), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_sweeper(nodes[0].sweeper.sweeper_async()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)); + + let config = builder.build(); + + let bp_future = super::process_events_async(config); match bp_future.await { Ok(_) => panic!("Expected error persisting manager"), Err(e) => { @@ -3015,18 +3393,15 @@ mod tests { let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); - let bp_future = super::process_events_async( + + let mut builder = BackgroundProcessorConfigAsyncBuilder::new( persister, |_: _| async { Ok(()) }, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.sweeper_async()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), move |dur: Duration| { let mut exit_receiver = exit_receiver.clone(); Box::pin(async move { @@ -3039,6 +3414,15 @@ mod tests { false, || Some(Duration::from_secs(1696300000)), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_sweeper(nodes[0].sweeper.sweeper_async()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)); + + let config = builder.build(); + + let bp_future = super::process_events_async(config); let t1 = tokio::spawn(bp_future); let t2 = tokio::spawn(async move { @@ -3236,18 +3620,14 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); - let bp_future = super::process_events_async( + let mut builder = BackgroundProcessorConfigAsyncBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), - Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.sweeper_async()), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), move |dur: Duration| { let mut exit_receiver = exit_receiver.clone(); Box::pin(async move { @@ -3260,6 +3640,15 @@ mod tests { false, || Some(Duration::ZERO), ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()) + .with_sweeper(nodes[0].sweeper.sweeper_async()) + .with_liquidity_manager(Arc::clone(&nodes[0].liquidity_manager)); + + let config = builder.build(); + + let bp_future = super::process_events_async(config); let t1 = tokio::spawn(bp_future); let t2 = tokio::spawn(async move { do_test_payment_path_scoring!(nodes, receiver.recv().await);