Skip to content

Commit 360731c

Browse files
committed
Introduce Runtime object allowng to detect outer runtime context
Instead of holding an `Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>` and dealing with stuff like `tokio::task::block_in_place` at all callsites, we introduce a `Runtime` object that takes care of the state transitions, and allows to detect and reuse an outer runtime context. We also adjust the `with_runtime` API to take a `tokio::runtime::Handle` rather than an `Arc<Runtime>`.
1 parent 0a2bccd commit 360731c

File tree

14 files changed

+335
-273
lines changed

14 files changed

+335
-273
lines changed

bindings/ldk_node.udl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ dictionary LogRecord {
6464

6565
[Trait, WithForeign]
6666
interface LogWriter {
67-
void log(LogRecord record);
67+
void log(LogRecord record);
6868
};
6969

7070
interface Builder {
@@ -161,8 +161,8 @@ interface Node {
161161

162162
[Enum]
163163
interface Bolt11InvoiceDescription {
164-
Hash(string hash);
165-
Direct(string description);
164+
Hash(string hash);
165+
Direct(string description);
166166
};
167167

168168
interface Bolt11Payment {
@@ -331,6 +331,7 @@ enum BuildError {
331331
"InvalidListeningAddresses",
332332
"InvalidAnnouncementAddresses",
333333
"InvalidNodeAlias",
334+
"RuntimeSetupFailed",
334335
"ReadFailed",
335336
"WriteFailed",
336337
"StoragePathAccessFailed",

src/builder.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::liquidity::{
2828
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
2929
use crate::message_handler::NodeCustomMessageHandler;
3030
use crate::peer_store::PeerStore;
31+
use crate::runtime::Runtime;
3132
use crate::tx_broadcaster::TransactionBroadcaster;
3233
use crate::types::{
3334
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
@@ -167,6 +168,8 @@ pub enum BuildError {
167168
InvalidAnnouncementAddresses,
168169
/// The provided alias is invalid.
169170
InvalidNodeAlias,
171+
/// An attempt to setup a runtime has failed.
172+
RuntimeSetupFailed,
170173
/// We failed to read data from the [`KVStore`].
171174
///
172175
/// [`KVStore`]: lightning::util::persist::KVStore
@@ -204,6 +207,7 @@ impl fmt::Display for BuildError {
204207
Self::InvalidAnnouncementAddresses => {
205208
write!(f, "Given announcement addresses are invalid.")
206209
},
210+
Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."),
207211
Self::ReadFailed => write!(f, "Failed to read from store."),
208212
Self::WriteFailed => write!(f, "Failed to write to store."),
209213
Self::StoragePathAccessFailed => write!(f, "Failed to access the given storage path."),
@@ -235,6 +239,7 @@ pub struct NodeBuilder {
235239
gossip_source_config: Option<GossipSourceConfig>,
236240
liquidity_source_config: Option<LiquiditySourceConfig>,
237241
log_writer_config: Option<LogWriterConfig>,
242+
runtime_handle: Option<tokio::runtime::Handle>,
238243
}
239244

240245
impl NodeBuilder {
@@ -251,16 +256,28 @@ impl NodeBuilder {
251256
let gossip_source_config = None;
252257
let liquidity_source_config = None;
253258
let log_writer_config = None;
259+
let runtime_handle = None;
254260
Self {
255261
config,
256262
entropy_source_config,
257263
chain_data_source_config,
258264
gossip_source_config,
259265
liquidity_source_config,
260266
log_writer_config,
267+
runtime_handle,
261268
}
262269
}
263270

271+
/// Configures the [`Node`] instance to (re-)use a specific `tokio` runtime.
272+
///
273+
/// If not provided, the node will spawn its own runtime or reuse any outer runtime context it
274+
/// can detect.
275+
#[cfg_attr(feature = "uniffi", allow(dead_code))]
276+
pub fn set_runtime(&mut self, runtime_handle: tokio::runtime::Handle) -> &mut Self {
277+
self.runtime_handle = Some(runtime_handle);
278+
self
279+
}
280+
264281
/// Configures the [`Node`] instance to source its wallet entropy from a seed file on disk.
265282
///
266283
/// If the given file does not exist a new random seed file will be generated and
@@ -630,6 +647,15 @@ impl NodeBuilder {
630647
) -> Result<Node, BuildError> {
631648
let logger = setup_logger(&self.log_writer_config, &self.config)?;
632649

650+
let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
651+
Arc::new(Runtime::with_handle(handle.clone()))
652+
} else {
653+
Arc::new(Runtime::new().map_err(|e| {
654+
log_error!(logger, "Failed to setup tokio runtime: {}", e);
655+
BuildError::RuntimeSetupFailed
656+
})?)
657+
};
658+
633659
let seed_bytes = seed_bytes_from_config(
634660
&self.config,
635661
self.entropy_source_config.as_ref(),
@@ -658,6 +684,7 @@ impl NodeBuilder {
658684
self.gossip_source_config.as_ref(),
659685
self.liquidity_source_config.as_ref(),
660686
seed_bytes,
687+
runtime,
661688
logger,
662689
Arc::new(vss_store),
663690
)
@@ -667,6 +694,15 @@ impl NodeBuilder {
667694
pub fn build_with_store(&self, kv_store: Arc<DynStore>) -> Result<Node, BuildError> {
668695
let logger = setup_logger(&self.log_writer_config, &self.config)?;
669696

697+
let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
698+
Arc::new(Runtime::with_handle(handle.clone()))
699+
} else {
700+
Arc::new(Runtime::new().map_err(|e| {
701+
log_error!(logger, "Failed to setup tokio runtime: {}", e);
702+
BuildError::RuntimeSetupFailed
703+
})?)
704+
};
705+
670706
let seed_bytes = seed_bytes_from_config(
671707
&self.config,
672708
self.entropy_source_config.as_ref(),
@@ -680,6 +716,7 @@ impl NodeBuilder {
680716
self.gossip_source_config.as_ref(),
681717
self.liquidity_source_config.as_ref(),
682718
seed_bytes,
719+
runtime,
683720
logger,
684721
kv_store,
685722
)
@@ -1011,7 +1048,7 @@ fn build_with_store_internal(
10111048
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
10121049
gossip_source_config: Option<&GossipSourceConfig>,
10131050
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
1014-
logger: Arc<Logger>, kv_store: Arc<DynStore>,
1051+
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
10151052
) -> Result<Node, BuildError> {
10161053
if let Err(err) = may_announce_channel(&config) {
10171054
if config.announcement_addresses.is_some() {
@@ -1199,8 +1236,6 @@ fn build_with_store_internal(
11991236
},
12001237
};
12011238

1202-
let runtime = Arc::new(RwLock::new(None));
1203-
12041239
// Initialize the ChainMonitor
12051240
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
12061241
Some(Arc::clone(&chain_source)),
@@ -1593,6 +1628,8 @@ fn build_with_store_internal(
15931628
let (stop_sender, _) = tokio::sync::watch::channel(());
15941629
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
15951630

1631+
let is_running = Arc::new(RwLock::new(false));
1632+
15961633
Ok(Node {
15971634
runtime,
15981635
stop_sender,
@@ -1618,6 +1655,7 @@ fn build_with_store_internal(
16181655
scorer,
16191656
peer_store,
16201657
payment_store,
1658+
is_running,
16211659
is_listening,
16221660
node_metrics,
16231661
})

src/chain/electrum.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::fee_estimator::{
1515
ConfirmationTarget,
1616
};
1717
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
18+
use crate::runtime::Runtime;
1819

1920
use lightning::chain::{Confirm, Filter, WatchedOutput};
2021
use lightning::util::ser::Writeable;
@@ -46,15 +47,14 @@ pub(crate) struct ElectrumRuntimeClient {
4647
electrum_client: Arc<ElectrumClient>,
4748
bdk_electrum_client: Arc<BdkElectrumClient<ElectrumClient>>,
4849
tx_sync: Arc<ElectrumSyncClient<Arc<Logger>>>,
49-
runtime: Arc<tokio::runtime::Runtime>,
50+
runtime: Arc<Runtime>,
5051
config: Arc<Config>,
5152
logger: Arc<Logger>,
5253
}
5354

5455
impl ElectrumRuntimeClient {
5556
pub(crate) fn new(
56-
server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
57-
logger: Arc<Logger>,
57+
server_url: String, runtime: Arc<Runtime>, config: Arc<Config>, logger: Arc<Logger>,
5858
) -> Result<Self, Error> {
5959
let electrum_config = ElectrumConfigBuilder::new()
6060
.retry(ELECTRUM_CLIENT_NUM_RETRIES)
@@ -187,7 +187,6 @@ impl ElectrumRuntimeClient {
187187

188188
let spawn_fut =
189189
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx));
190-
191190
let timeout_fut =
192191
tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut);
193192

src/chain/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::fee_estimator::{
2525
};
2626
use crate::io::utils::write_node_metrics;
2727
use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger};
28+
use crate::runtime::Runtime;
2829
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2930
use crate::{Error, NodeMetrics};
3031

@@ -127,7 +128,7 @@ impl ElectrumRuntimeStatus {
127128
}
128129

129130
pub(crate) fn start(
130-
&mut self, server_url: String, runtime: Arc<tokio::runtime::Runtime>, config: Arc<Config>,
131+
&mut self, server_url: String, runtime: Arc<Runtime>, config: Arc<Config>,
131132
logger: Arc<Logger>,
132133
) -> Result<(), Error> {
133134
match self {
@@ -359,7 +360,7 @@ impl ChainSource {
359360
}
360361
}
361362

362-
pub(crate) fn start(&self, runtime: Arc<tokio::runtime::Runtime>) -> Result<(), Error> {
363+
pub(crate) fn start(&self, runtime: Arc<Runtime>) -> Result<(), Error> {
363364
match self {
364365
Self::Electrum { server_url, electrum_runtime_status, config, logger, .. } => {
365366
electrum_runtime_status.write().unwrap().start(

src/event.rs

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use crate::io::{
2929
};
3030
use crate::logger::{log_debug, log_error, log_info, LdkLogger};
3131

32+
use crate::runtime::Runtime;
33+
3234
use lightning::events::bump_transaction::BumpTransactionEvent;
3335
use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent};
3436
use lightning::events::{Event as LdkEvent, PaymentFailureReason};
@@ -53,7 +55,7 @@ use core::future::Future;
5355
use core::task::{Poll, Waker};
5456
use std::collections::VecDeque;
5557
use std::ops::Deref;
56-
use std::sync::{Arc, Condvar, Mutex, RwLock};
58+
use std::sync::{Arc, Condvar, Mutex};
5759
use std::time::Duration;
5860

5961
/// An event emitted by [`Node`], which should be handled by the user.
@@ -451,7 +453,7 @@ where
451453
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
452454
payment_store: Arc<PaymentStore>,
453455
peer_store: Arc<PeerStore<L>>,
454-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
456+
runtime: Arc<Runtime>,
455457
logger: L,
456458
config: Arc<Config>,
457459
}
@@ -466,8 +468,8 @@ where
466468
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
467469
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
468470
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
469-
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
470-
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
471+
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>, runtime: Arc<Runtime>,
472+
logger: L, config: Arc<Config>,
471473
) -> Self {
472474
Self {
473475
event_queue,
@@ -1049,17 +1051,14 @@ where
10491051
let forwarding_channel_manager = self.channel_manager.clone();
10501052
let min = time_forwardable.as_millis() as u64;
10511053

1052-
let runtime_lock = self.runtime.read().unwrap();
1053-
debug_assert!(runtime_lock.is_some());
1054+
let future = async move {
1055+
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1056+
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
10541057

1055-
if let Some(runtime) = runtime_lock.as_ref() {
1056-
runtime.spawn(async move {
1057-
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
1058-
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
1058+
forwarding_channel_manager.process_pending_htlc_forwards();
1059+
};
10591060

1060-
forwarding_channel_manager.process_pending_htlc_forwards();
1061-
});
1062-
}
1061+
self.runtime.spawn(future);
10631062
},
10641063
LdkEvent::SpendableOutputs { outputs, channel_id } => {
10651064
match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
@@ -1421,31 +1420,27 @@ where
14211420
debug_assert!(false, "We currently don't handle BOLT12 invoices manually, so this event should never be emitted.");
14221421
},
14231422
LdkEvent::ConnectionNeeded { node_id, addresses } => {
1424-
let runtime_lock = self.runtime.read().unwrap();
1425-
debug_assert!(runtime_lock.is_some());
1426-
1427-
if let Some(runtime) = runtime_lock.as_ref() {
1428-
let spawn_logger = self.logger.clone();
1429-
let spawn_cm = Arc::clone(&self.connection_manager);
1430-
runtime.spawn(async move {
1431-
for addr in &addresses {
1432-
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1433-
Ok(()) => {
1434-
return;
1435-
},
1436-
Err(e) => {
1437-
log_error!(
1438-
spawn_logger,
1439-
"Failed to establish connection to peer {}@{}: {}",
1440-
node_id,
1441-
addr,
1442-
e
1443-
);
1444-
},
1445-
}
1423+
let spawn_logger = self.logger.clone();
1424+
let spawn_cm = Arc::clone(&self.connection_manager);
1425+
let future = async move {
1426+
for addr in &addresses {
1427+
match spawn_cm.connect_peer_if_necessary(node_id, addr.clone()).await {
1428+
Ok(()) => {
1429+
return;
1430+
},
1431+
Err(e) => {
1432+
log_error!(
1433+
spawn_logger,
1434+
"Failed to establish connection to peer {}@{}: {}",
1435+
node_id,
1436+
addr,
1437+
e
1438+
);
1439+
},
14461440
}
1447-
});
1448-
}
1441+
}
1442+
};
1443+
self.runtime.spawn(future);
14491444
},
14501445
LdkEvent::BumpTransaction(bte) => {
14511446
match bte {

0 commit comments

Comments
 (0)