@@ -33,14 +33,13 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
33
33
use bitcoin:: secp256k1:: { PublicKey , Scalar , Secp256k1 , SecretKey , Signing } ;
34
34
use bitcoin:: { ScriptBuf , Transaction , TxOut , Txid } ;
35
35
36
- use std:: mem;
37
36
use std:: ops:: { Deref , DerefMut } ;
38
37
use std:: sync:: { Arc , Mutex , RwLock } ;
39
38
use std:: time:: Duration ;
40
39
41
40
enum WalletSyncStatus {
42
41
Completed ,
43
- InProgress { subscribers : Vec < tokio:: sync:: oneshot :: Sender < Result < ( ) , Error > > > } ,
42
+ InProgress { subscribers : tokio:: sync:: broadcast :: Sender < Result < ( ) , Error > > } ,
44
43
}
45
44
46
45
pub struct Wallet < D , B : Deref , E : Deref , L : Deref >
89
88
}
90
89
91
90
pub ( crate ) async fn sync ( & self ) -> Result < ( ) , Error > {
92
- if let Some ( sync_receiver) = self . register_or_subscribe_pending_sync ( ) {
91
+ if let Some ( mut sync_receiver) = self . register_or_subscribe_pending_sync ( ) {
93
92
log_info ! ( self . logger, "Sync in progress, skipping." ) ;
94
- return sync_receiver. await . map_err ( |e| {
93
+ return sync_receiver. recv ( ) . await . map_err ( |e| {
95
94
debug_assert ! ( false , "Failed to receive wallet sync result: {:?}" , e) ;
96
95
log_error ! ( self . logger, "Failed to receive wallet sync result: {:?}" , e) ;
97
96
Error :: WalletOperationFailed
@@ -310,26 +309,25 @@ where
310
309
311
310
fn register_or_subscribe_pending_sync (
312
311
& self ,
313
- ) -> Option < tokio:: sync:: oneshot :: Receiver < Result < ( ) , Error > > > {
312
+ ) -> Option < tokio:: sync:: broadcast :: Receiver < Result < ( ) , Error > > > {
314
313
let mut sync_status_lock = self . sync_status . lock ( ) . unwrap ( ) ;
315
314
match sync_status_lock. deref_mut ( ) {
316
315
WalletSyncStatus :: Completed => {
317
316
// We're first to register for a sync.
318
- * sync_status_lock = WalletSyncStatus :: InProgress { subscribers : Vec :: new ( ) } ;
317
+ let ( tx, _) = tokio:: sync:: broadcast:: channel ( 1 ) ;
318
+ * sync_status_lock = WalletSyncStatus :: InProgress { subscribers : tx } ;
319
319
None
320
320
} ,
321
321
WalletSyncStatus :: InProgress { subscribers } => {
322
322
// A sync is in-progress, we subscribe.
323
- let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
324
- subscribers. push ( tx) ;
323
+ let rx = subscribers. subscribe ( ) ;
325
324
Some ( rx)
326
325
} ,
327
326
}
328
327
}
329
328
330
329
fn propagate_result_to_subscribers ( & self , res : Result < ( ) , Error > ) {
331
330
// Send the notification to any other tasks that might be waiting on it by now.
332
- let mut waiting_subscribers = Vec :: new ( ) ;
333
331
{
334
332
let mut sync_status_lock = self . sync_status . lock ( ) . unwrap ( ) ;
335
333
match sync_status_lock. deref_mut ( ) {
@@ -339,22 +337,27 @@ where
339
337
} ,
340
338
WalletSyncStatus :: InProgress { subscribers } => {
341
339
// A sync is in-progress, we notify subscribers.
342
- mem:: swap ( & mut waiting_subscribers, subscribers) ;
340
+ if subscribers. receiver_count ( ) > 0 {
341
+ match subscribers. send ( res) {
342
+ Ok ( _) => ( ) ,
343
+ Err ( e) => {
344
+ debug_assert ! (
345
+ false ,
346
+ "Failed to send wallet sync result to subscribers: {:?}" ,
347
+ e
348
+ ) ;
349
+ log_error ! (
350
+ self . logger,
351
+ "Failed to send wallet sync result to subscribers: {:?}" ,
352
+ e
353
+ ) ;
354
+ } ,
355
+ }
356
+ }
343
357
* sync_status_lock = WalletSyncStatus :: Completed ;
344
358
} ,
345
359
}
346
360
}
347
-
348
- for sender in waiting_subscribers {
349
- sender. send ( res) . unwrap_or_else ( |e| {
350
- debug_assert ! ( false , "Failed to send wallet sync result to subscribers: {:?}" , e) ;
351
- log_error ! (
352
- self . logger,
353
- "Failed to send wallet sync result to subscribers: {:?}" ,
354
- e
355
- ) ;
356
- } ) ;
357
- }
358
361
}
359
362
}
360
363
0 commit comments