@@ -347,7 +347,13 @@ impl Builder {
347
347
EsploraBlockchain :: from_client ( tx_sync. client ( ) . clone ( ) , BDK_CLIENT_STOP_GAP )
348
348
. with_concurrency ( BDK_CLIENT_CONCURRENCY ) ;
349
349
350
- let wallet = Arc :: new ( Wallet :: new ( blockchain, bdk_wallet, Arc :: clone ( & logger) ) ) ;
350
+ let runtime = Arc :: new ( RwLock :: new ( None ) ) ;
351
+ let wallet = Arc :: new ( Wallet :: new (
352
+ blockchain,
353
+ bdk_wallet,
354
+ Arc :: clone ( & runtime) ,
355
+ Arc :: clone ( & logger) ,
356
+ ) ) ;
351
357
352
358
let kv_store = Arc :: new ( FilesystemStore :: new ( ldk_data_dir. clone ( ) . into ( ) ) ) ;
353
359
@@ -556,10 +562,11 @@ impl Builder {
556
562
}
557
563
} ;
558
564
559
- let running = RwLock :: new ( None ) ;
565
+ let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
560
566
561
567
Node {
562
- running,
568
+ runtime,
569
+ stop_running,
563
570
config,
564
571
wallet,
565
572
tx_sync,
@@ -579,18 +586,12 @@ impl Builder {
579
586
}
580
587
}
581
588
582
- /// Wraps all objects that need to be preserved during the run time of [`Node`]. Will be dropped
583
- /// upon [`Node::stop()`].
584
- struct Runtime {
585
- tokio_runtime : Arc < tokio:: runtime:: Runtime > ,
586
- stop_runtime : Arc < AtomicBool > ,
587
- }
588
-
589
589
/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
590
590
///
591
591
/// Needs to be initialized and instantiated through [`Builder::build`].
592
592
pub struct Node {
593
- running : RwLock < Option < Runtime > > ,
593
+ runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
594
+ stop_running : Arc < AtomicBool > ,
594
595
config : Arc < Config > ,
595
596
wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
596
597
tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -616,49 +617,15 @@ impl Node {
616
617
/// a thread-safe manner.
617
618
pub fn start ( & self ) -> Result < ( ) , Error > {
618
619
// Acquire a run lock and hold it until we're setup.
619
- let mut run_lock = self . running . write ( ) . unwrap ( ) ;
620
- if run_lock . is_some ( ) {
620
+ let mut runtime_lock = self . runtime . write ( ) . unwrap ( ) ;
621
+ if runtime_lock . is_some ( ) {
621
622
// We're already running.
622
623
return Err ( Error :: AlreadyRunning ) ;
623
624
}
624
625
625
- let runtime = self . setup_runtime ( ) ?;
626
- * run_lock = Some ( runtime) ;
627
- Ok ( ( ) )
628
- }
629
-
630
- /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
631
- ///
632
- /// After this returns most API methods will return [`Error::NotRunning`].
633
- pub fn stop ( & self ) -> Result < ( ) , Error > {
634
- let mut run_lock = self . running . write ( ) . unwrap ( ) ;
635
- if run_lock. is_none ( ) {
636
- return Err ( Error :: NotRunning ) ;
637
- }
638
-
639
- let runtime = run_lock. as_ref ( ) . unwrap ( ) ;
640
-
641
- // Stop the runtime.
642
- runtime. stop_runtime . store ( true , Ordering :: Release ) ;
643
-
644
- // Stop disconnect peers.
645
- self . peer_manager . disconnect_all_peers ( ) ;
646
-
647
- // Drop the held runtimes.
648
- self . wallet . drop_runtime ( ) ;
649
-
650
- // Drop the runtime, which stops the background processor and any possibly remaining tokio threads.
651
- * run_lock = None ;
652
- Ok ( ( ) )
653
- }
654
-
655
- fn setup_runtime ( & self ) -> Result < Runtime , Error > {
656
- let tokio_runtime =
657
- Arc :: new ( tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ) ;
658
-
659
- self . wallet . set_runtime ( Arc :: clone ( & tokio_runtime) ) ;
626
+ let runtime = tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ;
660
627
661
- let stop_runtime = Arc :: new ( AtomicBool :: new ( false ) ) ;
628
+ let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
662
629
663
630
let event_handler = Arc :: new ( EventHandler :: new (
664
631
Arc :: clone ( & self . wallet ) ,
@@ -667,7 +634,7 @@ impl Node {
667
634
Arc :: clone ( & self . network_graph ) ,
668
635
Arc :: clone ( & self . keys_manager ) ,
669
636
Arc :: clone ( & self . payment_store ) ,
670
- Arc :: clone ( & tokio_runtime ) ,
637
+ Arc :: clone ( & self . runtime ) ,
671
638
Arc :: clone ( & self . logger ) ,
672
639
Arc :: clone ( & self . config ) ,
673
640
) ) ;
@@ -678,7 +645,7 @@ impl Node {
678
645
let sync_cman = Arc :: clone ( & self . channel_manager ) ;
679
646
let sync_cmon = Arc :: clone ( & self . chain_monitor ) ;
680
647
let sync_logger = Arc :: clone ( & self . logger ) ;
681
- let stop_sync = Arc :: clone ( & stop_runtime ) ;
648
+ let stop_sync = Arc :: clone ( & stop_running ) ;
682
649
683
650
std:: thread:: spawn ( move || {
684
651
tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
@@ -709,8 +676,8 @@ impl Node {
709
676
} ) ;
710
677
711
678
let sync_logger = Arc :: clone ( & self . logger ) ;
712
- let stop_sync = Arc :: clone ( & stop_runtime ) ;
713
- tokio_runtime . spawn ( async move {
679
+ let stop_sync = Arc :: clone ( & stop_running ) ;
680
+ runtime . spawn ( async move {
714
681
loop {
715
682
if stop_sync. load ( Ordering :: Acquire ) {
716
683
return ;
@@ -737,10 +704,10 @@ impl Node {
737
704
if let Some ( listening_address) = & self . config . listening_address {
738
705
// Setup networking
739
706
let peer_manager_connection_handler = Arc :: clone ( & self . peer_manager ) ;
740
- let stop_listen = Arc :: clone ( & stop_runtime ) ;
707
+ let stop_listen = Arc :: clone ( & stop_running ) ;
741
708
let listening_address = listening_address. clone ( ) ;
742
709
743
- tokio_runtime . spawn ( async move {
710
+ runtime . spawn ( async move {
744
711
let listener =
745
712
tokio:: net:: TcpListener :: bind ( listening_address) . await . expect (
746
713
"Failed to bind to listen address/port - is something else already listening on it?" ,
@@ -767,8 +734,8 @@ impl Node {
767
734
let connect_pm = Arc :: clone ( & self . peer_manager ) ;
768
735
let connect_logger = Arc :: clone ( & self . logger ) ;
769
736
let connect_peer_store = Arc :: clone ( & self . peer_store ) ;
770
- let stop_connect = Arc :: clone ( & stop_runtime ) ;
771
- tokio_runtime . spawn ( async move {
737
+ let stop_connect = Arc :: clone ( & stop_running ) ;
738
+ runtime . spawn ( async move {
772
739
let mut interval = tokio:: time:: interval ( PEER_RECONNECTION_INTERVAL ) ;
773
740
loop {
774
741
if stop_connect. load ( Ordering :: Acquire ) {
@@ -808,7 +775,7 @@ impl Node {
808
775
let background_peer_man = Arc :: clone ( & self . peer_manager ) ;
809
776
let background_logger = Arc :: clone ( & self . logger ) ;
810
777
let background_scorer = Arc :: clone ( & self . scorer ) ;
811
- let stop_background_processing = Arc :: clone ( & stop_runtime ) ;
778
+ let stop_background_processing = Arc :: clone ( & stop_running ) ;
812
779
let sleeper = move |d| {
813
780
let stop = Arc :: clone ( & stop_background_processing) ;
814
781
Box :: pin ( async move {
@@ -821,7 +788,7 @@ impl Node {
821
788
} )
822
789
} ;
823
790
824
- tokio_runtime . spawn ( async move {
791
+ runtime . spawn ( async move {
825
792
process_events_async (
826
793
background_persister,
827
794
|e| background_event_handler. handle_event ( e) ,
@@ -838,7 +805,23 @@ impl Node {
838
805
. expect ( "Failed to process events" ) ;
839
806
} ) ;
840
807
841
- Ok ( Runtime { tokio_runtime, stop_runtime } )
808
+ * runtime_lock = Some ( runtime) ;
809
+ Ok ( ( ) )
810
+ }
811
+
812
+ /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
813
+ ///
814
+ /// After this returns most API methods will return [`Error::NotRunning`].
815
+ pub fn stop ( & self ) -> Result < ( ) , Error > {
816
+ let runtime = self . runtime . write ( ) . unwrap ( ) . take ( ) . ok_or ( Error :: NotRunning ) ?;
817
+ // Stop the runtime.
818
+ self . stop_running . store ( true , Ordering :: Release ) ;
819
+
820
+ // Stop disconnect peers.
821
+ self . peer_manager . disconnect_all_peers ( ) ;
822
+
823
+ runtime. shutdown_timeout ( Duration :: from_secs ( 10 ) ) ;
824
+ Ok ( ( ) )
842
825
}
843
826
844
827
/// Blocks until the next event is available.
@@ -915,12 +898,11 @@ impl Node {
915
898
pub fn connect (
916
899
& self , node_id : PublicKey , address : SocketAddr , permanently : bool ,
917
900
) -> Result < ( ) , Error > {
918
- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
919
- if runtime_lock . is_none ( ) {
901
+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
902
+ if rt_lock . is_none ( ) {
920
903
return Err ( Error :: NotRunning ) ;
921
904
}
922
-
923
- let runtime = runtime_lock. as_ref ( ) . unwrap ( ) ;
905
+ let runtime = rt_lock. as_ref ( ) . unwrap ( ) ;
924
906
925
907
let peer_info = PeerInfo { pubkey : node_id, address } ;
926
908
@@ -932,7 +914,7 @@ impl Node {
932
914
let con_pm = Arc :: clone ( & self . peer_manager ) ;
933
915
934
916
tokio:: task:: block_in_place ( move || {
935
- runtime. tokio_runtime . block_on ( async move {
917
+ runtime. block_on ( async move {
936
918
let res =
937
919
connect_peer_if_necessary ( con_peer_pubkey, con_peer_addr, con_pm, con_logger)
938
920
. await ;
@@ -958,8 +940,8 @@ impl Node {
958
940
/// Will also remove the peer from the peer store, i.e., after this has been called we won't
959
941
/// try to reconnect on restart.
960
942
pub fn disconnect ( & self , counterparty_node_id : & PublicKey ) -> Result < ( ) , Error > {
961
- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
962
- if runtime_lock . is_none ( ) {
943
+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
944
+ if rt_lock . is_none ( ) {
963
945
return Err ( Error :: NotRunning ) ;
964
946
}
965
947
@@ -989,12 +971,11 @@ impl Node {
989
971
& self , node_id : PublicKey , address : SocketAddr , channel_amount_sats : u64 ,
990
972
push_to_counterparty_msat : Option < u64 > , announce_channel : bool ,
991
973
) -> Result < ( ) , Error > {
992
- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
993
- if runtime_lock . is_none ( ) {
974
+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
975
+ if rt_lock . is_none ( ) {
994
976
return Err ( Error :: NotRunning ) ;
995
977
}
996
-
997
- let runtime = runtime_lock. as_ref ( ) . unwrap ( ) ;
978
+ let runtime = rt_lock. as_ref ( ) . unwrap ( ) ;
998
979
999
980
let cur_balance = self . wallet . get_balance ( ) ?;
1000
981
if cur_balance. get_spendable ( ) < channel_amount_sats {
@@ -1012,7 +993,7 @@ impl Node {
1012
993
let con_pm = Arc :: clone ( & self . peer_manager ) ;
1013
994
1014
995
tokio:: task:: block_in_place ( move || {
1015
- runtime. tokio_runtime . block_on ( async move {
996
+ runtime. block_on ( async move {
1016
997
let res =
1017
998
connect_peer_if_necessary ( con_peer_pubkey, con_peer_addr, con_pm, con_logger)
1018
999
. await ;
@@ -1067,10 +1048,12 @@ impl Node {
1067
1048
///
1068
1049
/// Note that the wallets will be also synced regularly in the background.
1069
1050
pub fn sync_wallets ( & self ) -> Result < ( ) , Error > {
1070
- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
1071
- if runtime_lock . is_none ( ) {
1051
+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1052
+ if rt_lock . is_none ( ) {
1072
1053
return Err ( Error :: NotRunning ) ;
1073
1054
}
1055
+ let runtime = rt_lock. as_ref ( ) . unwrap ( ) ;
1056
+
1074
1057
let wallet = Arc :: clone ( & self . wallet ) ;
1075
1058
let tx_sync = Arc :: clone ( & self . tx_sync ) ;
1076
1059
let sync_cman = Arc :: clone ( & self . channel_manager ) ;
@@ -1081,7 +1064,6 @@ impl Node {
1081
1064
& * sync_cmon as & ( dyn Confirm + Sync + Send ) ,
1082
1065
] ;
1083
1066
1084
- let runtime = runtime_lock. as_ref ( ) . unwrap ( ) ;
1085
1067
tokio:: task:: block_in_place ( move || {
1086
1068
tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
1087
1069
async move {
@@ -1106,7 +1088,7 @@ impl Node {
1106
1088
1107
1089
let sync_logger = Arc :: clone ( & self . logger ) ;
1108
1090
tokio:: task:: block_in_place ( move || {
1109
- runtime. tokio_runtime . block_on ( async move {
1091
+ runtime. block_on ( async move {
1110
1092
let now = Instant :: now ( ) ;
1111
1093
match tx_sync. sync ( confirmables) . await {
1112
1094
Ok ( ( ) ) => {
@@ -1141,7 +1123,8 @@ impl Node {
1141
1123
1142
1124
/// Send a payement given an invoice.
1143
1125
pub fn send_payment ( & self , invoice : & Invoice ) -> Result < PaymentHash , Error > {
1144
- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1126
+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1127
+ if rt_lock. is_none ( ) {
1145
1128
return Err ( Error :: NotRunning ) ;
1146
1129
}
1147
1130
@@ -1207,7 +1190,8 @@ impl Node {
1207
1190
pub fn send_payment_using_amount (
1208
1191
& self , invoice : & Invoice , amount_msat : u64 ,
1209
1192
) -> Result < PaymentHash , Error > {
1210
- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1193
+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1194
+ if rt_lock. is_none ( ) {
1211
1195
return Err ( Error :: NotRunning ) ;
1212
1196
}
1213
1197
@@ -1295,7 +1279,8 @@ impl Node {
1295
1279
pub fn send_spontaneous_payment (
1296
1280
& self , amount_msat : u64 , node_id : & PublicKey ,
1297
1281
) -> Result < PaymentHash , Error > {
1298
- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1282
+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1283
+ if rt_lock. is_none ( ) {
1299
1284
return Err ( Error :: NotRunning ) ;
1300
1285
}
1301
1286
0 commit comments