Skip to content

Commit 45fe8b2

Browse files
committed
Shutdown: Wait for event processing to fully stop
.. before initiating the Runtime shutdown.
1 parent d062439 commit 45fe8b2

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,7 @@ fn build_with_store_internal(
972972
};
973973

974974
let (stop_sender, _) = tokio::sync::watch::channel(());
975+
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
975976

976977
let is_listening = Arc::new(AtomicBool::new(false));
977978
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
@@ -983,6 +984,7 @@ fn build_with_store_internal(
983984
Ok(Node {
984985
runtime,
985986
stop_sender,
987+
event_handling_stopped_sender,
986988
config,
987989
wallet,
988990
tx_sync,

src/lib.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ uniffi::include_scaffolding!("ldk_node");
171171
pub struct Node {
172172
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
173173
stop_sender: tokio::sync::watch::Sender<()>,
174+
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
174175
config: Arc<Config>,
175176
wallet: Arc<Wallet>,
176177
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -710,6 +711,7 @@ impl Node {
710711
};
711712

712713
let background_stop_logger = Arc::clone(&self.logger);
714+
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
713715
runtime.spawn(async move {
714716
process_events_async(
715717
background_persister,
@@ -730,6 +732,18 @@ impl Node {
730732
panic!("Failed to process events");
731733
});
732734
log_trace!(background_stop_logger, "Events processing stopped.",);
735+
736+
match event_handling_stopped_sender.send(()) {
737+
Ok(_) => (),
738+
Err(e) => {
739+
log_error!(
740+
background_stop_logger,
741+
"Failed to send 'events handling stopped' signal. This should never happen: {}",
742+
e
743+
);
744+
debug_assert!(false);
745+
},
746+
}
733747
});
734748

735749
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
@@ -779,9 +793,47 @@ impl Node {
779793
},
780794
}
781795

782-
// Stop disconnect peers.
796+
// Disconnect all peers.
783797
self.peer_manager.disconnect_all_peers();
784798

799+
// Wait until event handling stopped, at least until a timeout is reached.
800+
let event_handling_stopped_logger = Arc::clone(&self.logger);
801+
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
802+
803+
let _ = runtime
804+
.block_on(async {
805+
tokio::time::timeout(
806+
Duration::from_secs(10),
807+
event_handling_stopped_receiver.changed(),
808+
)
809+
.await
810+
})
811+
.map_err(|e| {
812+
log_error!(
813+
event_handling_stopped_logger,
814+
"Stopping event handling timed out. This should never happen: {}",
815+
e
816+
);
817+
debug_assert!(false);
818+
})
819+
.unwrap_or_else(|_| {
820+
log_error!(
821+
event_handling_stopped_logger,
822+
"Stopping event handling failed. This should never happen.",
823+
);
824+
panic!("Stopping event handling failed. This should never happen.");
825+
});
826+
827+
#[cfg(tokio_unstable)]
828+
{
829+
log_trace!(
830+
self.logger,
831+
"Active runtime tasks left prior to shutdown: {}",
832+
runtime.metrics().active_tasks_count()
833+
);
834+
}
835+
836+
// Shutdown our runtime. By now ~no or only very few tasks should be left.
785837
runtime.shutdown_timeout(Duration::from_secs(10));
786838

787839
log_info!(self.logger, "Shutdown complete.");

0 commit comments

Comments
 (0)