Skip to content

Commit f3f290b

Browse files
committed
Drop potentially dangerous sync_lock Condvar and use pub/sub model
Using a `Condvar` could be potentially dangerous in async contexts as `wait`ing on it might block the current thread potentially hosting more than one task. Here, we drop the `Condvar` and adopt a pub/sub scheme instead, similar to the one we already implemented in `ConnectionManager`.
1 parent 4ee3b4a commit f3f290b

File tree

1 file changed

+108
-54
lines changed

1 file changed

+108
-54
lines changed

src/wallet.rs

Lines changed: 108 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,16 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
3333
use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey, Signing};
3434
use bitcoin::{ScriptBuf, Transaction, TxOut, Txid};
3535

36-
use std::ops::Deref;
37-
use std::sync::{Arc, Condvar, Mutex, RwLock};
36+
use std::mem;
37+
use std::ops::{Deref, DerefMut};
38+
use std::sync::{Arc, Mutex, RwLock};
3839
use std::time::Duration;
3940

41+
enum WalletSyncStatus {
42+
Completed,
43+
InProgress { subscribers: Vec<tokio::sync::oneshot::Sender<Result<(), Error>>> },
44+
}
45+
4046
pub struct Wallet<D, B: Deref, E: Deref, L: Deref>
4147
where
4248
D: BatchDatabase,
@@ -51,7 +57,8 @@ where
5157
// A cache storing the most recently retrieved fee rate estimations.
5258
broadcaster: B,
5359
fee_estimator: E,
54-
sync_lock: (Mutex<()>, Condvar),
60+
// A Mutex holding the current sync status.
61+
sync_status: Mutex<WalletSyncStatus>,
5562
// TODO: Drop this workaround after BDK 1.0 upgrade.
5663
balance_cache: RwLock<Balance>,
5764
logger: L,
@@ -76,69 +83,67 @@ where
7683
});
7784

7885
let inner = Mutex::new(wallet);
79-
let sync_lock = (Mutex::new(()), Condvar::new());
86+
let sync_status = Mutex::new(WalletSyncStatus::Completed);
8087
let balance_cache = RwLock::new(start_balance);
81-
Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, balance_cache, logger }
88+
Self { blockchain, inner, broadcaster, fee_estimator, sync_status, balance_cache, logger }
8289
}
8390

8491
pub(crate) async fn sync(&self) -> Result<(), Error> {
85-
let (lock, cvar) = &self.sync_lock;
86-
87-
let guard = match lock.try_lock() {
88-
Ok(guard) => guard,
89-
Err(_) => {
90-
log_info!(self.logger, "Sync in progress, skipping.");
91-
let guard = cvar.wait(lock.lock().unwrap());
92-
drop(guard);
93-
cvar.notify_all();
94-
return Ok(());
95-
},
96-
};
92+
if let Some(sync_receiver) = self.register_or_subscribe_pending_sync() {
93+
log_info!(self.logger, "Sync in progress, skipping.");
94+
return sync_receiver.await.map_err(|e| {
95+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
96+
log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e);
97+
Error::WalletOperationFailed
98+
})?;
99+
}
97100

98-
let sync_options = SyncOptions { progress: None };
99-
let wallet_lock = self.inner.lock().unwrap();
100-
let res = match wallet_lock.sync(&self.blockchain, sync_options).await {
101-
Ok(()) => {
102-
// TODO: Drop this workaround after BDK 1.0 upgrade.
103-
// Update balance cache after syncing.
104-
if let Ok(balance) = wallet_lock.get_balance() {
105-
*self.balance_cache.write().unwrap() = balance;
106-
}
107-
Ok(())
108-
},
109-
Err(e) => match e {
110-
bdk::Error::Esplora(ref be) => match **be {
111-
bdk::blockchain::esplora::EsploraError::Reqwest(_) => {
112-
// Drop lock, sleep for a second, retry.
113-
drop(wallet_lock);
114-
tokio::time::sleep(Duration::from_secs(1)).await;
115-
log_error!(
116-
self.logger,
117-
"Sync failed due to HTTP connection error, retrying: {}",
118-
e
119-
);
120-
let sync_options = SyncOptions { progress: None };
121-
self.inner
122-
.lock()
123-
.unwrap()
124-
.sync(&self.blockchain, sync_options)
125-
.await
126-
.map_err(From::from)
101+
let res = {
102+
let sync_options = SyncOptions { progress: None };
103+
let wallet_lock = self.inner.lock().unwrap();
104+
match wallet_lock.sync(&self.blockchain, sync_options).await {
105+
Ok(()) => {
106+
// TODO: Drop this workaround after BDK 1.0 upgrade.
107+
// Update balance cache after syncing.
108+
if let Ok(balance) = wallet_lock.get_balance() {
109+
*self.balance_cache.write().unwrap() = balance;
110+
}
111+
Ok(())
112+
},
113+
Err(e) => match e {
114+
bdk::Error::Esplora(ref be) => match **be {
115+
bdk::blockchain::esplora::EsploraError::Reqwest(_) => {
116+
// Drop lock, sleep for a second, retry.
117+
drop(wallet_lock);
118+
tokio::time::sleep(Duration::from_secs(1)).await;
119+
log_error!(
120+
self.logger,
121+
"Sync failed due to HTTP connection error, retrying: {}",
122+
e
123+
);
124+
let sync_options = SyncOptions { progress: None };
125+
self.inner
126+
.lock()
127+
.unwrap()
128+
.sync(&self.blockchain, sync_options)
129+
.await
130+
.map_err(From::from)
131+
},
132+
_ => {
133+
log_error!(self.logger, "Sync failed due to Esplora error: {}", e);
134+
Err(From::from(e))
135+
},
127136
},
128137
_ => {
129-
log_error!(self.logger, "Sync failed due to Esplora error: {}", e);
138+
log_error!(self.logger, "Wallet sync error: {}", e);
130139
Err(From::from(e))
131140
},
132141
},
133-
_ => {
134-
log_error!(self.logger, "Wallet sync error: {}", e);
135-
Err(From::from(e))
136-
},
137-
},
142+
}
138143
};
139144

140-
drop(guard);
141-
cvar.notify_all();
145+
self.propagate_result_to_subscribers(res);
146+
142147
res
143148
}
144149

@@ -303,6 +308,55 @@ where
303308

304309
Ok(txid)
305310
}
311+
312+
fn register_or_subscribe_pending_sync(
313+
&self,
314+
) -> Option<tokio::sync::oneshot::Receiver<Result<(), Error>>> {
315+
let mut sync_status_lock = self.sync_status.lock().unwrap();
316+
match sync_status_lock.deref_mut() {
317+
WalletSyncStatus::Completed => {
318+
// We're first to register for a sync.
319+
*sync_status_lock = WalletSyncStatus::InProgress { subscribers: Vec::new() };
320+
None
321+
},
322+
WalletSyncStatus::InProgress { subscribers } => {
323+
// A sync is in-progress, we subscribe.
324+
let (tx, rx) = tokio::sync::oneshot::channel();
325+
subscribers.push(tx);
326+
Some(rx)
327+
},
328+
}
329+
}
330+
331+
fn propagate_result_to_subscribers(&self, res: Result<(), Error>) {
332+
// Send the notification to any other tasks that might be waiting on it by now.
333+
let mut waiting_subscribers = Vec::new();
334+
{
335+
let mut sync_status_lock = self.sync_status.lock().unwrap();
336+
match sync_status_lock.deref_mut() {
337+
WalletSyncStatus::Completed => {
338+
// No sync in-progress, do nothing.
339+
return;
340+
},
341+
WalletSyncStatus::InProgress { subscribers } => {
342+
// A sync is in-progress, we notify subscribers.
343+
mem::swap(&mut waiting_subscribers, subscribers);
344+
*sync_status_lock = WalletSyncStatus::Completed;
345+
},
346+
}
347+
}
348+
349+
for sender in waiting_subscribers {
350+
sender.send(res).unwrap_or_else(|e| {
351+
debug_assert!(false, "Failed to send wallet sync result to subscribers: {:?}", e);
352+
log_error!(
353+
self.logger,
354+
"Failed to send wallet sync result to subscribers: {:?}",
355+
e
356+
);
357+
});
358+
}
359+
}
306360
}
307361

308362
impl<D, B: Deref, E: Deref, L: Deref> WalletSource for Wallet<D, B, E, L>

0 commit comments

Comments
 (0)