Skip to content

Commit 4e305e6

Browse files
committed
Move Lightning wallet syncing to ChainSource
1 parent 94cc448 commit 4e305e6

File tree

5 files changed

+182
-150
lines changed

5 files changed

+182
-150
lines changed

src/builder.rs

Lines changed: 58 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ use lightning::util::sweep::OutputSweeper;
5151

5252
use lightning_persister::fs_store::FilesystemStore;
5353

54-
use lightning_transaction_sync::EsploraSyncClient;
55-
5654
use lightning_liquidity::lsps2::client::LSPS2ClientConfig;
5755
use lightning_liquidity::{LiquidityClientConfig, LiquidityManager};
5856

@@ -585,82 +583,71 @@ fn build_with_store_internal(
585583
})?,
586584
};
587585

588-
let (wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator) =
589-
match chain_data_source_config {
590-
Some(ChainDataSourceConfig::Esplora(server_url)) => {
591-
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
592-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
593-
let esplora_client = client_builder.build_async().unwrap();
594-
let tx_sync =
595-
Arc::new(EsploraSyncClient::from_client(esplora_client, Arc::clone(&logger)));
596-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
597-
tx_sync.client().clone(),
598-
Arc::clone(&logger),
599-
));
600-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
601-
tx_sync.client().clone(),
602-
Arc::clone(&config),
603-
Arc::clone(&logger),
604-
));
586+
let (wallet, chain_source, tx_broadcaster, fee_estimator) = match chain_data_source_config {
587+
Some(ChainDataSourceConfig::Esplora(server_url)) => {
588+
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
589+
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
590+
let esplora_client = client_builder.build_async().unwrap();
591+
let tx_broadcaster =
592+
Arc::new(TransactionBroadcaster::new(esplora_client.clone(), Arc::clone(&logger)));
593+
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
594+
esplora_client,
595+
Arc::clone(&config),
596+
Arc::clone(&logger),
597+
));
605598

606-
let wallet = Arc::new(Wallet::new(
607-
bdk_wallet,
608-
wallet_persister,
609-
Arc::clone(&tx_broadcaster),
610-
Arc::clone(&fee_estimator),
611-
Arc::clone(&logger),
612-
));
599+
let wallet = Arc::new(Wallet::new(
600+
bdk_wallet,
601+
wallet_persister,
602+
Arc::clone(&tx_broadcaster),
603+
Arc::clone(&fee_estimator),
604+
Arc::clone(&logger),
605+
));
613606

614-
let chain_source = Arc::new(ChainSource::new_esplora(
615-
server_url.clone(),
616-
Arc::clone(&wallet),
617-
Arc::clone(&logger),
618-
));
619-
(wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator)
620-
},
621-
None => {
622-
// Default to Esplora client.
623-
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
624-
let mut client_builder = esplora_client::Builder::new(&server_url);
625-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
626-
let esplora_client = client_builder.build_async().unwrap();
627-
let tx_sync = Arc::new(EsploraSyncClient::from_client(
628-
esplora_client.clone(),
629-
Arc::clone(&logger),
630-
));
631-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
632-
tx_sync.client().clone(),
633-
Arc::clone(&logger),
634-
));
635-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
636-
tx_sync.client().clone(),
637-
Arc::clone(&config),
638-
Arc::clone(&logger),
639-
));
607+
let chain_source = Arc::new(ChainSource::new_esplora(
608+
server_url.clone(),
609+
Arc::clone(&wallet),
610+
Arc::clone(&logger),
611+
));
612+
(wallet, chain_source, tx_broadcaster, fee_estimator)
613+
},
614+
None => {
615+
// Default to Esplora client.
616+
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
617+
let mut client_builder = esplora_client::Builder::new(&server_url);
618+
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
619+
let esplora_client = client_builder.build_async().unwrap();
620+
let tx_broadcaster =
621+
Arc::new(TransactionBroadcaster::new(esplora_client.clone(), Arc::clone(&logger)));
622+
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
623+
esplora_client,
624+
Arc::clone(&config),
625+
Arc::clone(&logger),
626+
));
640627

641-
let wallet = Arc::new(Wallet::new(
642-
bdk_wallet,
643-
wallet_persister,
644-
Arc::clone(&tx_broadcaster),
645-
Arc::clone(&fee_estimator),
646-
Arc::clone(&logger),
647-
));
628+
let wallet = Arc::new(Wallet::new(
629+
bdk_wallet,
630+
wallet_persister,
631+
Arc::clone(&tx_broadcaster),
632+
Arc::clone(&fee_estimator),
633+
Arc::clone(&logger),
634+
));
648635

649-
let chain_source = Arc::new(ChainSource::new_esplora(
650-
server_url.clone(),
651-
Arc::clone(&wallet),
652-
Arc::clone(&logger),
653-
));
636+
let chain_source = Arc::new(ChainSource::new_esplora(
637+
server_url.clone(),
638+
Arc::clone(&wallet),
639+
Arc::clone(&logger),
640+
));
654641

655-
(wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator)
656-
},
657-
};
642+
(wallet, chain_source, tx_broadcaster, fee_estimator)
643+
},
644+
};
658645

659646
let runtime = Arc::new(RwLock::new(None));
660647

661648
// Initialize the ChainMonitor
662649
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
663-
Some(Arc::clone(&tx_sync)),
650+
Some(Arc::clone(&chain_source)),
664651
Arc::clone(&tx_broadcaster),
665652
Arc::clone(&logger),
666653
Arc::clone(&fee_estimator),
@@ -876,7 +863,7 @@ fn build_with_store_internal(
876863
let liquidity_manager = Arc::new(LiquidityManager::new(
877864
Arc::clone(&keys_manager),
878865
Arc::clone(&channel_manager),
879-
Some(Arc::clone(&tx_sync)),
866+
Some(Arc::clone(&chain_source)),
880867
None,
881868
None,
882869
liquidity_client_config,
@@ -944,7 +931,7 @@ fn build_with_store_internal(
944931
let output_sweeper = match io::utils::read_output_sweeper(
945932
Arc::clone(&tx_broadcaster),
946933
Arc::clone(&fee_estimator),
947-
Arc::clone(&tx_sync),
934+
Arc::clone(&chain_source),
948935
Arc::clone(&keys_manager),
949936
Arc::clone(&kv_store),
950937
Arc::clone(&logger),
@@ -956,7 +943,7 @@ fn build_with_store_internal(
956943
channel_manager.current_best_block(),
957944
Arc::clone(&tx_broadcaster),
958945
Arc::clone(&fee_estimator),
959-
Some(Arc::clone(&tx_sync)),
946+
Some(Arc::clone(&chain_source)),
960947
Arc::clone(&keys_manager),
961948
Arc::clone(&keys_manager),
962949
Arc::clone(&kv_store),
@@ -1033,7 +1020,6 @@ fn build_with_store_internal(
10331020
config,
10341021
wallet,
10351022
chain_source,
1036-
tx_sync,
10371023
tx_broadcaster,
10381024
fee_estimator,
10391025
event_queue,

src/chain/mod.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,18 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::config::{BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS};
8+
use crate::config::{
9+
BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS,
10+
LDK_WALLET_SYNC_TIMEOUT_SECS,
11+
};
912
use crate::logger::{log_error, log_info, FilesystemLogger, Logger};
1013
use crate::types::Wallet;
1114
use crate::Error;
1215

16+
use lightning::chain::{Confirm, Filter};
17+
18+
use lightning_transaction_sync::EsploraSyncClient;
19+
1320
use bdk_esplora::EsploraAsyncExt;
1421

1522
use esplora_client::AsyncClient as EsploraAsyncClient;
@@ -81,6 +88,8 @@ pub(crate) enum ChainSource {
8188
esplora_client: EsploraAsyncClient,
8289
onchain_wallet: Arc<Wallet>,
8390
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
91+
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
92+
lightning_wallet_sync_status: Mutex<WalletSyncStatus>,
8493
logger: Arc<FilesystemLogger>,
8594
},
8695
}
@@ -92,11 +101,16 @@ impl ChainSource {
92101
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
93102
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
94103
let esplora_client = client_builder.build_async().unwrap();
104+
let tx_sync =
105+
Arc::new(EsploraSyncClient::from_client(esplora_client.clone(), Arc::clone(&logger)));
95106
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
107+
let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
96108
Self::Esplora {
97109
esplora_client,
98110
onchain_wallet,
99111
onchain_wallet_sync_status,
112+
tx_sync,
113+
lightning_wallet_sync_status,
100114
logger,
101115
}
102116
}
@@ -166,4 +180,57 @@ impl ChainSource {
166180
},
167181
}
168182
}
183+
184+
pub(crate) async fn sync_lightning_wallet(
185+
&self, confirmables: Vec<&(dyn Confirm + Send + Sync)>,
186+
) -> Result<(), Error> {
187+
match self {
188+
Self::Esplora { tx_sync, lightning_wallet_sync_status, logger, .. } => {
189+
let receiver_res = {
190+
let mut status_lock = lightning_wallet_sync_status.lock().unwrap();
191+
status_lock.register_or_subscribe_pending_sync()
192+
};
193+
if let Some(mut sync_receiver) = receiver_res {
194+
log_info!(logger, "Sync in progress, skipping.");
195+
return sync_receiver.recv().await.map_err(|e| {
196+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
197+
log_error!(logger, "Failed to receive wallet sync result: {:?}", e);
198+
Error::WalletOperationFailed
199+
})?;
200+
}
201+
let res = {
202+
let timeout_fut = tokio::time::timeout(
203+
Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS),
204+
tx_sync.sync(confirmables),
205+
);
206+
match timeout_fut.await {
207+
Ok(res) => res.map_err(|_| Error::TxSyncFailed),
208+
Err(e) => {
209+
log_error!(logger, "Lightning wallet sync timed out: {}", e);
210+
Err(Error::TxSyncTimeout)
211+
},
212+
}
213+
};
214+
215+
lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
216+
217+
res
218+
},
219+
}
220+
}
221+
}
222+
223+
impl Filter for ChainSource {
224+
fn register_tx(&self, txid: &bitcoin::Txid, script_pubkey: &bitcoin::Script) {
225+
{
226+
match self {
227+
Self::Esplora { tx_sync, .. } => tx_sync.register_tx(txid, script_pubkey),
228+
}
229+
}
230+
}
231+
fn register_output(&self, output: lightning::chain::WatchedOutput) {
232+
match self {
233+
Self::Esplora { tx_sync, .. } => tx_sync.register_output(output),
234+
}
235+
}
169236
}

src/io/utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
use super::*;
99
use crate::config::WALLET_KEYS_SEED_LEN;
1010

11+
use crate::chain::ChainSource;
1112
use crate::logger::{log_error, FilesystemLogger};
1213
use crate::peer_store::PeerStore;
1314
use crate::sweep::DeprecatedSpendableOutputInfo;
14-
use crate::types::{Broadcaster, ChainSource, DynStore, FeeEstimator, KeysManager, Sweeper};
15+
use crate::types::{Broadcaster, DynStore, FeeEstimator, KeysManager, Sweeper};
1516
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
1617
use crate::{Error, EventQueue, PaymentDetails};
1718

0 commit comments

Comments
 (0)