Skip to content

Commit e6ce2d9

Browse files
committed
f Switch to runtime building on initialization
1 parent ddf7a72 commit e6ce2d9

File tree

13 files changed

+168
-258
lines changed

13 files changed

+168
-258
lines changed

bindings/ldk_node.udl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,6 @@ interface LSPS1Liquidity {
252252
enum NodeError {
253253
"AlreadyRunning",
254254
"NotRunning",
255-
"RuntimeSetupFailed",
256255
"OnchainTxCreationFailed",
257256
"ConnectionFailed",
258257
"InvoiceCreationFailed",
@@ -331,6 +330,7 @@ enum BuildError {
331330
"InvalidListeningAddresses",
332331
"InvalidAnnouncementAddresses",
333332
"InvalidNodeAlias",
333+
"RuntimeSetupFailed",
334334
"ReadFailed",
335335
"WriteFailed",
336336
"StoragePathAccessFailed",

src/builder.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pub enum BuildError {
155155
InvalidAnnouncementAddresses,
156156
/// The provided alias is invalid.
157157
InvalidNodeAlias,
158+
/// An attempt to setup a runtime has failed.
159+
RuntimeSetupFailed,
158160
/// We failed to read data from the [`KVStore`].
159161
///
160162
/// [`KVStore`]: lightning::util::persist::KVStore
@@ -192,6 +194,7 @@ impl fmt::Display for BuildError {
192194
Self::InvalidAnnouncementAddresses => {
193195
write!(f, "Given announcement addresses are invalid.")
194196
},
197+
Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."),
195198
Self::ReadFailed => write!(f, "Failed to read from store."),
196199
Self::WriteFailed => write!(f, "Failed to write to store."),
197200
Self::StoragePathAccessFailed => write!(f, "Failed to access the given storage path."),
@@ -223,6 +226,7 @@ pub struct NodeBuilder {
223226
gossip_source_config: Option<GossipSourceConfig>,
224227
liquidity_source_config: Option<LiquiditySourceConfig>,
225228
log_writer_config: Option<LogWriterConfig>,
229+
runtime_handle: Option<tokio::runtime::Handle>,
226230
}
227231

228232
impl NodeBuilder {
@@ -239,16 +243,27 @@ impl NodeBuilder {
239243
let gossip_source_config = None;
240244
let liquidity_source_config = None;
241245
let log_writer_config = None;
246+
let runtime_handle = None;
242247
Self {
243248
config,
244249
entropy_source_config,
245250
chain_data_source_config,
246251
gossip_source_config,
247252
liquidity_source_config,
248253
log_writer_config,
254+
runtime_handle,
249255
}
250256
}
251257

258+
/// Configures the [`Node`] instance to (re-)use a specific `tokio` runtime.
259+
///
260+
/// If not provided, the node will spawn its own runtime or reuse any outer runtime context it
261+
/// can detect.
262+
pub fn set_runtime(&mut self, runtime_handle: tokio::runtime::Handle) -> &mut Self {
263+
self.runtime_handle = Some(runtime_handle);
264+
self
265+
}
266+
252267
/// Configures the [`Node`] instance to source its wallet entropy from a seed file on disk.
253268
///
254269
/// If the given file does not exist a new random seed file will be generated and
@@ -583,6 +598,15 @@ impl NodeBuilder {
583598
) -> Result<Node, BuildError> {
584599
let logger = setup_logger(&self.log_writer_config, &self.config)?;
585600

601+
let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
602+
Arc::new(Runtime::with_handle(handle.clone()))
603+
} else {
604+
Arc::new(Runtime::new().map_err(|e| {
605+
log_error!(logger, "Failed to setup tokio runtime: {}", e);
606+
BuildError::RuntimeSetupFailed
607+
})?)
608+
};
609+
586610
let seed_bytes = seed_bytes_from_config(
587611
&self.config,
588612
self.entropy_source_config.as_ref(),
@@ -611,6 +635,7 @@ impl NodeBuilder {
611635
self.gossip_source_config.as_ref(),
612636
self.liquidity_source_config.as_ref(),
613637
seed_bytes,
638+
runtime,
614639
logger,
615640
Arc::new(vss_store),
616641
)
@@ -620,6 +645,15 @@ impl NodeBuilder {
620645
pub fn build_with_store(&self, kv_store: Arc<DynStore>) -> Result<Node, BuildError> {
621646
let logger = setup_logger(&self.log_writer_config, &self.config)?;
622647

648+
let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
649+
Arc::new(Runtime::with_handle(handle.clone()))
650+
} else {
651+
Arc::new(Runtime::new().map_err(|e| {
652+
log_error!(logger, "Failed to setup tokio runtime: {}", e);
653+
BuildError::RuntimeSetupFailed
654+
})?)
655+
};
656+
623657
let seed_bytes = seed_bytes_from_config(
624658
&self.config,
625659
self.entropy_source_config.as_ref(),
@@ -633,6 +667,7 @@ impl NodeBuilder {
633667
self.gossip_source_config.as_ref(),
634668
self.liquidity_source_config.as_ref(),
635669
seed_bytes,
670+
runtime,
636671
logger,
637672
kv_store,
638673
)
@@ -935,7 +970,7 @@ fn build_with_store_internal(
935970
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
936971
gossip_source_config: Option<&GossipSourceConfig>,
937972
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
938-
logger: Arc<Logger>, kv_store: Arc<DynStore>,
973+
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
939974
) -> Result<Node, BuildError> {
940975
if let Err(err) = may_announce_channel(&config) {
941976
if config.announcement_addresses.is_some() {
@@ -1102,8 +1137,6 @@ fn build_with_store_internal(
11021137
},
11031138
};
11041139

1105-
let runtime = Arc::new(Runtime::new(Arc::clone(&logger)));
1106-
11071140
// Initialize the ChainMonitor
11081141
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
11091142
Some(Arc::clone(&chain_source)),
@@ -1496,6 +1529,8 @@ fn build_with_store_internal(
14961529
let (stop_sender, _) = tokio::sync::watch::channel(());
14971530
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
14981531

1532+
let is_running = Arc::new(RwLock::new(false));
1533+
14991534
Ok(Node {
15001535
runtime,
15011536
stop_sender,
@@ -1521,6 +1556,7 @@ fn build_with_store_internal(
15211556
scorer,
15221557
peer_store,
15231558
payment_store,
1559+
is_running,
15241560
is_listening,
15251561
node_metrics,
15261562
})

src/chain/electrum.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl ElectrumRuntimeClient {
8888
let now = Instant::now();
8989

9090
let tx_sync = Arc::clone(&self.tx_sync);
91-
let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables))?;
91+
let spawn_fut = self.runtime.spawn_blocking(move || tx_sync.sync(confirmables));
9292
let timeout_fut =
9393
tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
9494

@@ -130,7 +130,7 @@ impl ElectrumRuntimeClient {
130130
BDK_ELECTRUM_CLIENT_BATCH_SIZE,
131131
true,
132132
)
133-
})?;
133+
});
134134
let wallet_sync_timeout_fut =
135135
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
136136

@@ -159,7 +159,7 @@ impl ElectrumRuntimeClient {
159159

160160
let spawn_fut = self.runtime.spawn_blocking(move || {
161161
bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true)
162-
})?;
162+
});
163163
let wallet_sync_timeout_fut =
164164
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
165165

@@ -185,18 +185,8 @@ impl ElectrumRuntimeClient {
185185
let txid = tx.compute_txid();
186186
let tx_bytes = tx.encode();
187187

188-
let spawn_fut = if let Ok(spawn_fut) =
189-
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx))
190-
{
191-
spawn_fut
192-
} else {
193-
debug_assert!(
194-
false,
195-
"Failed to broadcast due to runtime being unavailable. This should never happen."
196-
);
197-
return;
198-
};
199-
188+
let spawn_fut =
189+
self.runtime.spawn_blocking(move || electrum_client.transaction_broadcast(&tx));
200190
let timeout_fut =
201191
tokio::time::timeout(Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), spawn_fut);
202192

@@ -242,7 +232,7 @@ impl ElectrumRuntimeClient {
242232
batch.estimate_fee(num_blocks);
243233
}
244234

245-
let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch))?;
235+
let spawn_fut = self.runtime.spawn_blocking(move || electrum_client.batch_call(&batch));
246236

247237
let timeout_fut = tokio::time::timeout(
248238
Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS),

src/error.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::runtime::RuntimeError;
9-
108
use bdk_chain::bitcoin::psbt::ExtractTxError as BdkExtractTxError;
119
use bdk_chain::local_chain::CannotConnectError as BdkChainConnectionError;
1210
use bdk_chain::tx_graph::CalculateFeeError as BdkChainCalculateFeeError;
@@ -22,8 +20,6 @@ pub enum Error {
2220
AlreadyRunning,
2321
/// Returned when trying to stop [`crate::Node`] while it is not running.
2422
NotRunning,
25-
/// An attempt to setup a runtime has failed.
26-
RuntimeSetupFailed,
2723
/// An on-chain transaction could not be created.
2824
OnchainTxCreationFailed,
2925
/// A network connection has been closed.
@@ -131,7 +127,6 @@ impl fmt::Display for Error {
131127
match *self {
132128
Self::AlreadyRunning => write!(f, "Node is already running."),
133129
Self::NotRunning => write!(f, "Node is not running."),
134-
Self::RuntimeSetupFailed => write!(f, "Failed to setup a runtime."),
135130
Self::OnchainTxCreationFailed => {
136131
write!(f, "On-chain transaction could not be created.")
137132
},
@@ -204,16 +199,6 @@ impl fmt::Display for Error {
204199

205200
impl std::error::Error for Error {}
206201

207-
impl From<RuntimeError> for Error {
208-
fn from(runtime_error: RuntimeError) -> Self {
209-
match runtime_error {
210-
RuntimeError::SetupFailed => Self::RuntimeSetupFailed,
211-
RuntimeError::AlreadyRunning => Self::AlreadyRunning,
212-
RuntimeError::NotRunning => Self::NotRunning,
213-
}
214-
}
215-
}
216-
217202
impl From<BdkSignerError> for Error {
218203
fn from(_: BdkSignerError) -> Self {
219204
Self::OnchainTxSigningFailed

src/event.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::io::{
2929
};
3030
use crate::logger::{log_debug, log_error, log_info, LdkLogger};
3131

32-
use crate::runtime::{Runtime, RuntimeError};
32+
use crate::runtime::Runtime;
3333

3434
use lightning::events::bump_transaction::BumpTransactionEvent;
3535
use lightning::events::{ClosureReason, PaymentPurpose, ReplayEvent};
@@ -1058,10 +1058,7 @@ where
10581058
forwarding_channel_manager.process_pending_htlc_forwards();
10591059
};
10601060

1061-
if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) {
1062-
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
1063-
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
1064-
}
1061+
self.runtime.spawn(future);
10651062
},
10661063
LdkEvent::SpendableOutputs { outputs, channel_id } => {
10671064
match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
@@ -1441,10 +1438,7 @@ where
14411438
}
14421439
}
14431440
};
1444-
if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) {
1445-
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
1446-
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
1447-
}
1441+
self.runtime.spawn(future);
14481442
},
14491443
LdkEvent::BumpTransaction(bte) => {
14501444
match bte {

src/gossip.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
use crate::chain::ChainSource;
99
use crate::config::RGS_SYNC_TIMEOUT_SECS;
10-
use crate::logger::{log_error, log_trace, LdkLogger, Logger};
11-
use crate::runtime::{Runtime, RuntimeError};
10+
use crate::logger::{log_trace, LdkLogger, Logger};
11+
use crate::runtime::Runtime;
1212
use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup};
1313
use crate::Error;
1414

@@ -22,7 +22,6 @@ use std::time::Duration;
2222
pub(crate) enum GossipSource {
2323
P2PNetwork {
2424
gossip_sync: Arc<P2PGossipSync>,
25-
logger: Arc<Logger>,
2625
},
2726
RapidGossipSync {
2827
gossip_sync: Arc<RapidGossipSync>,
@@ -39,7 +38,7 @@ impl GossipSource {
3938
None::<Arc<UtxoLookup>>,
4039
Arc::clone(&logger),
4140
));
42-
Self::P2PNetwork { gossip_sync, logger }
41+
Self::P2PNetwork { gossip_sync }
4342
}
4443

4544
pub fn new_rgs(
@@ -67,9 +66,9 @@ impl GossipSource {
6766
runtime: Arc<Runtime>,
6867
) {
6968
match self {
70-
Self::P2PNetwork { gossip_sync, logger } => {
69+
Self::P2PNetwork { gossip_sync } => {
7170
if let Some(utxo_source) = chain_source.as_utxo_source() {
72-
let spawner = RuntimeSpawner::new(Arc::clone(&runtime), Arc::clone(&logger));
71+
let spawner = RuntimeSpawner::new(Arc::clone(&runtime));
7372
let gossip_verifier = Arc::new(GossipVerifier::new(
7473
utxo_source,
7574
spawner,
@@ -135,20 +134,16 @@ impl GossipSource {
135134

136135
pub(crate) struct RuntimeSpawner {
137136
runtime: Arc<Runtime>,
138-
logger: Arc<Logger>,
139137
}
140138

141139
impl RuntimeSpawner {
142-
pub(crate) fn new(runtime: Arc<Runtime>, logger: Arc<Logger>) -> Self {
143-
Self { runtime, logger }
140+
pub(crate) fn new(runtime: Arc<Runtime>) -> Self {
141+
Self { runtime }
144142
}
145143
}
146144

147145
impl FutureSpawner for RuntimeSpawner {
148146
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
149-
if let Err(RuntimeError::NotRunning) = self.runtime.spawn(future) {
150-
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
151-
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
152-
}
147+
self.runtime.spawn(future);
153148
}
154149
}

0 commit comments

Comments
 (0)