Skip to content

Commit 94cc448

Browse files
committed
Move on-chain syncing to ChainSource
1 parent a3f0603 commit 94cc448

File tree

4 files changed

+240
-202
lines changed

4 files changed

+240
-202
lines changed

src/builder.rs

Lines changed: 72 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::chain::{DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL};
8+
use crate::chain::{ChainSource, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL};
99
use crate::config::{default_user_config, Config, WALLET_KEYS_SEED_LEN};
1010

1111
use crate::connection::ConnectionManager;
@@ -585,58 +585,78 @@ fn build_with_store_internal(
585585
})?,
586586
};
587587

588-
let (esplora_client, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
589-
Some(ChainDataSourceConfig::Esplora(server_url)) => {
590-
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
591-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
592-
let esplora_client = client_builder.build_async().unwrap();
593-
let tx_sync = Arc::new(EsploraSyncClient::from_client(
594-
esplora_client.clone(),
595-
Arc::clone(&logger),
596-
));
597-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
598-
tx_sync.client().clone(),
599-
Arc::clone(&logger),
600-
));
601-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
602-
tx_sync.client().clone(),
603-
Arc::clone(&config),
604-
Arc::clone(&logger),
605-
));
606-
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
607-
},
608-
None => {
609-
// Default to Esplora client.
610-
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
611-
let mut client_builder = esplora_client::Builder::new(&server_url);
612-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
613-
let esplora_client = client_builder.build_async().unwrap();
614-
let tx_sync = Arc::new(EsploraSyncClient::from_client(
615-
esplora_client.clone(),
616-
Arc::clone(&logger),
617-
));
618-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
619-
tx_sync.client().clone(),
620-
Arc::clone(&logger),
621-
));
622-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
623-
tx_sync.client().clone(),
624-
Arc::clone(&config),
625-
Arc::clone(&logger),
626-
));
627-
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
628-
},
629-
};
588+
let (wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator) =
589+
match chain_data_source_config {
590+
Some(ChainDataSourceConfig::Esplora(server_url)) => {
591+
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
592+
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
593+
let esplora_client = client_builder.build_async().unwrap();
594+
let tx_sync =
595+
Arc::new(EsploraSyncClient::from_client(esplora_client, Arc::clone(&logger)));
596+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
597+
tx_sync.client().clone(),
598+
Arc::clone(&logger),
599+
));
600+
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
601+
tx_sync.client().clone(),
602+
Arc::clone(&config),
603+
Arc::clone(&logger),
604+
));
605+
606+
let wallet = Arc::new(Wallet::new(
607+
bdk_wallet,
608+
wallet_persister,
609+
Arc::clone(&tx_broadcaster),
610+
Arc::clone(&fee_estimator),
611+
Arc::clone(&logger),
612+
));
613+
614+
let chain_source = Arc::new(ChainSource::new_esplora(
615+
server_url.clone(),
616+
Arc::clone(&wallet),
617+
Arc::clone(&logger),
618+
));
619+
(wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator)
620+
},
621+
None => {
622+
// Default to Esplora client.
623+
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
624+
let mut client_builder = esplora_client::Builder::new(&server_url);
625+
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
626+
let esplora_client = client_builder.build_async().unwrap();
627+
let tx_sync = Arc::new(EsploraSyncClient::from_client(
628+
esplora_client.clone(),
629+
Arc::clone(&logger),
630+
));
631+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
632+
tx_sync.client().clone(),
633+
Arc::clone(&logger),
634+
));
635+
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
636+
tx_sync.client().clone(),
637+
Arc::clone(&config),
638+
Arc::clone(&logger),
639+
));
640+
641+
let wallet = Arc::new(Wallet::new(
642+
bdk_wallet,
643+
wallet_persister,
644+
Arc::clone(&tx_broadcaster),
645+
Arc::clone(&fee_estimator),
646+
Arc::clone(&logger),
647+
));
648+
649+
let chain_source = Arc::new(ChainSource::new_esplora(
650+
server_url.clone(),
651+
Arc::clone(&wallet),
652+
Arc::clone(&logger),
653+
));
654+
655+
(wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator)
656+
},
657+
};
630658

631659
let runtime = Arc::new(RwLock::new(None));
632-
let wallet = Arc::new(Wallet::new(
633-
bdk_wallet,
634-
wallet_persister,
635-
esplora_client,
636-
Arc::clone(&tx_broadcaster),
637-
Arc::clone(&fee_estimator),
638-
Arc::clone(&logger),
639-
));
640660

641661
// Initialize the ChainMonitor
642662
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
@@ -1012,6 +1032,7 @@ fn build_with_store_internal(
10121032
event_handling_stopped_sender,
10131033
config,
10141034
wallet,
1035+
chain_source,
10151036
tx_sync,
10161037
tx_broadcaster,
10171038
fee_estimator,

src/chain/mod.rs

Lines changed: 137 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,165 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::logger::FilesystemLogger;
8+
use crate::config::{BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS};
9+
use crate::logger::{log_error, log_info, FilesystemLogger, Logger};
10+
use crate::types::Wallet;
11+
use crate::Error;
12+
13+
use bdk_esplora::EsploraAsyncExt;
914

1015
use esplora_client::AsyncClient as EsploraAsyncClient;
1116

12-
use std::sync::Arc;
17+
use std::sync::{Arc, Mutex};
18+
use std::time::Duration;
1319

1420
// The default Esplora server we're using.
1521
pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api";
1622

1723
// The default Esplora client timeout we're using.
1824
pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10;
1925

26+
pub(crate) enum WalletSyncStatus {
27+
Completed,
28+
InProgress { subscribers: tokio::sync::broadcast::Sender<Result<(), Error>> },
29+
}
30+
31+
impl WalletSyncStatus {
32+
fn register_or_subscribe_pending_sync(
33+
&mut self,
34+
) -> Option<tokio::sync::broadcast::Receiver<Result<(), Error>>> {
35+
match self {
36+
WalletSyncStatus::Completed => {
37+
// We're first to register for a sync.
38+
let (tx, _) = tokio::sync::broadcast::channel(1);
39+
*self = WalletSyncStatus::InProgress { subscribers: tx };
40+
None
41+
},
42+
WalletSyncStatus::InProgress { subscribers } => {
43+
// A sync is in-progress, we subscribe.
44+
let rx = subscribers.subscribe();
45+
Some(rx)
46+
},
47+
}
48+
}
49+
50+
fn propagate_result_to_subscribers(&mut self, res: Result<(), Error>) {
51+
// Send the notification to any other tasks that might be waiting on it by now.
52+
{
53+
match self {
54+
WalletSyncStatus::Completed => {
55+
// No sync in-progress, do nothing.
56+
return;
57+
},
58+
WalletSyncStatus::InProgress { subscribers } => {
59+
// A sync is in-progress, we notify subscribers.
60+
if subscribers.receiver_count() > 0 {
61+
match subscribers.send(res) {
62+
Ok(_) => (),
63+
Err(e) => {
64+
debug_assert!(
65+
false,
66+
"Failed to send wallet sync result to subscribers: {:?}",
67+
e
68+
);
69+
},
70+
}
71+
}
72+
*self = WalletSyncStatus::Completed;
73+
},
74+
}
75+
}
76+
}
77+
}
78+
2079
pub(crate) enum ChainSource {
2180
Esplora {
2281
esplora_client: EsploraAsyncClient,
82+
onchain_wallet: Arc<Wallet>,
83+
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
2384
logger: Arc<FilesystemLogger>,
2485
},
2586
}
2687

2788
impl ChainSource {
2889
pub(crate) fn new_esplora(
29-
server_url: String,
30-
logger: Arc<FilesystemLogger>,
90+
server_url: String, onchain_wallet: Arc<Wallet>, logger: Arc<FilesystemLogger>,
3191
) -> Self {
3292
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
3393
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
3494
let esplora_client = client_builder.build_async().unwrap();
35-
Self::Esplora { esplora_client, logger }
95+
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
96+
Self::Esplora {
97+
esplora_client,
98+
onchain_wallet,
99+
onchain_wallet_sync_status,
100+
logger,
101+
}
102+
}
103+
104+
pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> {
105+
match self {
106+
Self::Esplora {
107+
esplora_client,
108+
onchain_wallet,
109+
onchain_wallet_sync_status,
110+
logger,
111+
..
112+
} => {
113+
let receiver_res = {
114+
let mut status_lock = onchain_wallet_sync_status.lock().unwrap();
115+
status_lock.register_or_subscribe_pending_sync()
116+
};
117+
if let Some(mut sync_receiver) = receiver_res {
118+
log_info!(logger, "Sync in progress, skipping.");
119+
return sync_receiver.recv().await.map_err(|e| {
120+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
121+
log_error!(logger, "Failed to receive wallet sync result: {:?}", e);
122+
Error::WalletOperationFailed
123+
})?;
124+
}
125+
126+
let res = {
127+
let full_scan_request = onchain_wallet.get_full_scan_request();
128+
129+
let wallet_sync_timeout_fut = tokio::time::timeout(
130+
Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS),
131+
esplora_client.full_scan(
132+
full_scan_request,
133+
BDK_CLIENT_STOP_GAP,
134+
BDK_CLIENT_CONCURRENCY,
135+
),
136+
);
137+
138+
match wallet_sync_timeout_fut.await {
139+
Ok(res) => match res {
140+
Ok(update) => onchain_wallet.apply_update(update),
141+
Err(e) => match *e {
142+
esplora_client::Error::Reqwest(he) => {
143+
log_error!(
144+
logger,
145+
"Sync failed due to HTTP connection error: {}",
146+
he
147+
);
148+
Err(Error::WalletOperationFailed)
149+
},
150+
_ => {
151+
log_error!(logger, "Sync failed due to Esplora error: {}", e);
152+
Err(Error::WalletOperationFailed)
153+
},
154+
},
155+
},
156+
Err(e) => {
157+
log_error!(logger, "On-chain wallet sync timed out: {}", e);
158+
Err(Error::WalletOperationTimeout)
159+
},
160+
}
161+
};
162+
163+
onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
164+
165+
res
166+
},
167+
}
36168
}
37169
}

src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ use config::{
127127
WALLET_SYNC_INTERVAL_MINIMUM_SECS,
128128
};
129129
use connection::ConnectionManager;
130+
use chain::ChainSource;
130131
use event::{EventHandler, EventQueue};
131132
use gossip::GossipSource;
132133
use graph::NetworkGraph;
@@ -180,6 +181,7 @@ pub struct Node {
180181
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
181182
config: Arc<Config>,
182183
wallet: Arc<Wallet>,
184+
chain_source: Arc<ChainSource>,
183185
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
184186
tx_broadcaster: Arc<Broadcaster>,
185187
fee_estimator: Arc<FeeEstimator>,
@@ -274,7 +276,7 @@ impl Node {
274276
})?;
275277

276278
// Setup wallet sync
277-
let wallet = Arc::clone(&self.wallet);
279+
let chain_source = Arc::clone(&self.chain_source);
278280
let sync_logger = Arc::clone(&self.logger);
279281
let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
280282
let mut stop_sync = self.stop_sender.subscribe();
@@ -298,7 +300,7 @@ impl Node {
298300
}
299301
_ = onchain_wallet_sync_interval.tick() => {
300302
let now = Instant::now();
301-
match wallet.sync().await {
303+
match chain_source.sync_onchain_wallet().await {
302304
Ok(()) => {
303305
log_trace!(
304306
sync_logger,
@@ -1368,7 +1370,7 @@ impl Node {
13681370
return Err(Error::NotRunning);
13691371
}
13701372

1371-
let wallet = Arc::clone(&self.wallet);
1373+
let chain_source = Arc::clone(&self.chain_source);
13721374
let tx_sync = Arc::clone(&self.tx_sync);
13731375
let sync_cman = Arc::clone(&self.channel_manager);
13741376
let archive_cman = Arc::clone(&self.channel_manager);
@@ -1394,7 +1396,7 @@ impl Node {
13941396
let now = Instant::now();
13951397
// We don't add an additional timeout here, as `Wallet::sync` already returns
13961398
// after a timeout.
1397-
match wallet.sync().await {
1399+
match chain_source.sync_onchain_wallet().await {
13981400
Ok(()) => {
13991401
log_info!(
14001402
sync_logger,

0 commit comments

Comments
 (0)