Skip to content

Commit f4b02b7

Browse files
committed
Shutdown: Wait for event processing to fully stop
.. before initiating the Runtime shutdown.
1 parent 3e6176c commit f4b02b7

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,7 @@ fn build_with_store_internal(
972972
};
973973

974974
let (stop_sender, _) = tokio::sync::watch::channel(());
975+
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
975976

976977
let is_listening = Arc::new(AtomicBool::new(false));
977978
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
@@ -984,6 +985,7 @@ fn build_with_store_internal(
984985
Ok(Node {
985986
runtime,
986987
stop_sender,
988+
event_handling_stopped_sender,
987989
config,
988990
wallet,
989991
tx_sync,

src/lib.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ uniffi::include_scaffolding!("ldk_node");
174174
pub struct Node {
175175
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
176176
stop_sender: tokio::sync::watch::Sender<()>,
177+
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
177178
config: Arc<Config>,
178179
wallet: Arc<Wallet>,
179180
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -727,6 +728,7 @@ impl Node {
727728
};
728729

729730
let background_stop_logger = Arc::clone(&self.logger);
731+
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
730732
runtime.spawn(async move {
731733
process_events_async(
732734
background_persister,
@@ -747,6 +749,18 @@ impl Node {
747749
panic!("Failed to process events");
748750
});
749751
log_trace!(background_stop_logger, "Events processing stopped.",);
752+
753+
match event_handling_stopped_sender.send(()) {
754+
Ok(_) => (),
755+
Err(e) => {
756+
log_error!(
757+
background_stop_logger,
758+
"Failed to send 'events handling stopped' signal. This should never happen: {}",
759+
e
760+
);
761+
debug_assert!(false);
762+
},
763+
}
750764
});
751765

752766
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
@@ -796,9 +810,55 @@ impl Node {
796810
},
797811
}
798812

799-
// Stop disconnect peers.
813+
// Disconnect all peers.
800814
self.peer_manager.disconnect_all_peers();
801815

816+
// Wait until event handling stopped, at least until a timeout is reached.
817+
let event_handling_stopped_logger = Arc::clone(&self.logger);
818+
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
819+
820+
// FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
821+
// event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
822+
// should drop this considerably post upgrading to BDK 1.0.
823+
let timeout_res = runtime.block_on(async {
824+
tokio::time::timeout(
825+
Duration::from_secs(100),
826+
event_handling_stopped_receiver.changed(),
827+
)
828+
.await
829+
});
830+
831+
match timeout_res {
832+
Ok(stop_res) => match stop_res {
833+
Ok(()) => {},
834+
Err(e) => {
835+
log_error!(
836+
event_handling_stopped_logger,
837+
"Stopping event handling failed. This should never happen: {}",
838+
e
839+
);
840+
panic!("Stopping event handling failed. This should never happen.");
841+
},
842+
},
843+
Err(e) => {
844+
log_error!(
845+
event_handling_stopped_logger,
846+
"Stopping event handling timed out: {}",
847+
e
848+
);
849+
},
850+
}
851+
852+
#[cfg(tokio_unstable)]
853+
{
854+
log_trace!(
855+
self.logger,
856+
"Active runtime tasks left prior to shutdown: {}",
857+
runtime.metrics().active_tasks_count()
858+
);
859+
}
860+
861+
// Shutdown our runtime. By now ~no or only very few tasks should be left.
802862
runtime.shutdown_timeout(Duration::from_secs(10));
803863

804864
log_info!(self.logger, "Shutdown complete.");

0 commit comments

Comments
 (0)