Skip to content

Commit e3279d7

Browse files
committed
Drop potentially dangerous sync_lock Condvar and use pub/sub model
Using a `Condvar` could be potentially dangerous in async contexts as `wait`ing on it might block the current thread potentially hosting more than one task. Here, we drop the `Condvar` and adopt a pub/sub scheme instead, similar to the one we already implemented in `ConnectionManager`.
1 parent 80d24c3 commit e3279d7

File tree

1 file changed

+110
-54
lines changed

1 file changed

+110
-54
lines changed

src/wallet.rs

Lines changed: 110 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,15 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
3333
use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey, Signing};
3434
use bitcoin::{ScriptBuf, Transaction, TxOut, Txid};
3535

36-
use std::ops::Deref;
37-
use std::sync::{Arc, Condvar, Mutex, RwLock};
36+
use std::ops::{Deref, DerefMut};
37+
use std::sync::{Arc, Mutex, RwLock};
3838
use std::time::Duration;
3939

40+
enum WalletSyncStatus {
41+
Completed,
42+
InProgress { subscribers: tokio::sync::broadcast::Sender<Result<(), Error>> },
43+
}
44+
4045
pub struct Wallet<D, B: Deref, E: Deref, L: Deref>
4146
where
4247
D: BatchDatabase,
@@ -51,7 +56,8 @@ where
5156
// A cache storing the most recently retrieved fee rate estimations.
5257
broadcaster: B,
5358
fee_estimator: E,
54-
sync_lock: (Mutex<()>, Condvar),
59+
// A Mutex holding the current sync status.
60+
sync_status: Mutex<WalletSyncStatus>,
5561
// TODO: Drop this workaround after BDK 1.0 upgrade.
5662
balance_cache: RwLock<Balance>,
5763
logger: L,
@@ -76,69 +82,66 @@ where
7682
});
7783

7884
let inner = Mutex::new(wallet);
79-
let sync_lock = (Mutex::new(()), Condvar::new());
85+
let sync_status = Mutex::new(WalletSyncStatus::Completed);
8086
let balance_cache = RwLock::new(start_balance);
81-
Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, balance_cache, logger }
87+
Self { blockchain, inner, broadcaster, fee_estimator, sync_status, balance_cache, logger }
8288
}
8389

8490
pub(crate) async fn sync(&self) -> Result<(), Error> {
85-
let (lock, cvar) = &self.sync_lock;
86-
87-
let guard = match lock.try_lock() {
88-
Ok(guard) => guard,
89-
Err(_) => {
90-
log_info!(self.logger, "Sync in progress, skipping.");
91-
let guard = cvar.wait(lock.lock().unwrap());
92-
drop(guard);
93-
cvar.notify_all();
94-
return Ok(());
95-
},
96-
};
91+
if let Some(mut sync_receiver) = self.register_or_subscribe_pending_sync() {
92+
log_info!(self.logger, "Sync in progress, skipping.");
93+
return sync_receiver.recv().await.map_err(|e| {
94+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
95+
log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e);
96+
Error::WalletOperationFailed
97+
})?;
98+
}
9799

98-
let sync_options = SyncOptions { progress: None };
99-
let wallet_lock = self.inner.lock().unwrap();
100-
let res = match wallet_lock.sync(&self.blockchain, sync_options).await {
101-
Ok(()) => {
102-
// TODO: Drop this workaround after BDK 1.0 upgrade.
103-
// Update balance cache after syncing.
104-
if let Ok(balance) = wallet_lock.get_balance() {
105-
*self.balance_cache.write().unwrap() = balance;
106-
}
107-
Ok(())
108-
},
109-
Err(e) => match e {
110-
bdk::Error::Esplora(ref be) => match **be {
111-
bdk::blockchain::esplora::EsploraError::Reqwest(_) => {
112-
// Drop lock, sleep for a second, retry.
113-
drop(wallet_lock);
114-
tokio::time::sleep(Duration::from_secs(1)).await;
115-
log_error!(
116-
self.logger,
117-
"Sync failed due to HTTP connection error, retrying: {}",
118-
e
119-
);
120-
let sync_options = SyncOptions { progress: None };
121-
self.inner
122-
.lock()
123-
.unwrap()
124-
.sync(&self.blockchain, sync_options)
125-
.await
126-
.map_err(From::from)
100+
let res = {
101+
let wallet_lock = self.inner.lock().unwrap();
102+
match wallet_lock.sync(&self.blockchain, SyncOptions { progress: None }).await {
103+
Ok(()) => {
104+
// TODO: Drop this workaround after BDK 1.0 upgrade.
105+
// Update balance cache after syncing.
106+
if let Ok(balance) = wallet_lock.get_balance() {
107+
*self.balance_cache.write().unwrap() = balance;
108+
}
109+
Ok(())
110+
},
111+
Err(e) => match e {
112+
bdk::Error::Esplora(ref be) => match **be {
113+
bdk::blockchain::esplora::EsploraError::Reqwest(_) => {
114+
// Drop lock, sleep for a second, retry.
115+
drop(wallet_lock);
116+
tokio::time::sleep(Duration::from_secs(1)).await;
117+
log_error!(
118+
self.logger,
119+
"Sync failed due to HTTP connection error, retrying: {}",
120+
e
121+
);
122+
let sync_options = SyncOptions { progress: None };
123+
self.inner
124+
.lock()
125+
.unwrap()
126+
.sync(&self.blockchain, sync_options)
127+
.await
128+
.map_err(From::from)
129+
},
130+
_ => {
131+
log_error!(self.logger, "Sync failed due to Esplora error: {}", e);
132+
Err(From::from(e))
133+
},
127134
},
128135
_ => {
129-
log_error!(self.logger, "Sync failed due to Esplora error: {}", e);
136+
log_error!(self.logger, "Wallet sync error: {}", e);
130137
Err(From::from(e))
131138
},
132139
},
133-
_ => {
134-
log_error!(self.logger, "Wallet sync error: {}", e);
135-
Err(From::from(e))
136-
},
137-
},
140+
}
138141
};
139142

140-
drop(guard);
141-
cvar.notify_all();
143+
self.propagate_result_to_subscribers(res);
144+
142145
res
143146
}
144147

@@ -303,6 +306,59 @@ where
303306

304307
Ok(txid)
305308
}
309+
310+
fn register_or_subscribe_pending_sync(
311+
&self,
312+
) -> Option<tokio::sync::broadcast::Receiver<Result<(), Error>>> {
313+
let mut sync_status_lock = self.sync_status.lock().unwrap();
314+
match sync_status_lock.deref_mut() {
315+
WalletSyncStatus::Completed => {
316+
// We're first to register for a sync.
317+
let (tx, _) = tokio::sync::broadcast::channel(1);
318+
*sync_status_lock = WalletSyncStatus::InProgress { subscribers: tx };
319+
None
320+
},
321+
WalletSyncStatus::InProgress { subscribers } => {
322+
// A sync is in-progress, we subscribe.
323+
let rx = subscribers.subscribe();
324+
Some(rx)
325+
},
326+
}
327+
}
328+
329+
fn propagate_result_to_subscribers(&self, res: Result<(), Error>) {
330+
// Send the notification to any other tasks that might be waiting on it by now.
331+
{
332+
let mut sync_status_lock = self.sync_status.lock().unwrap();
333+
match sync_status_lock.deref_mut() {
334+
WalletSyncStatus::Completed => {
335+
// No sync in-progress, do nothing.
336+
return;
337+
},
338+
WalletSyncStatus::InProgress { subscribers } => {
339+
// A sync is in-progress, we notify subscribers.
340+
if subscribers.receiver_count() > 0 {
341+
match subscribers.send(res) {
342+
Ok(_) => (),
343+
Err(e) => {
344+
debug_assert!(
345+
false,
346+
"Failed to send wallet sync result to subscribers: {:?}",
347+
e
348+
);
349+
log_error!(
350+
self.logger,
351+
"Failed to send wallet sync result to subscribers: {:?}",
352+
e
353+
);
354+
},
355+
}
356+
}
357+
*sync_status_lock = WalletSyncStatus::Completed;
358+
},
359+
}
360+
}
361+
}
306362
}
307363

308364
impl<D, B: Deref, E: Deref, L: Deref> WalletSource for Wallet<D, B, E, L>

0 commit comments

Comments
 (0)