Skip to content

Commit 84ce2f2

Browse files
committed
Persist sweeper state as part of background process
To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart.
1 parent c09d41c commit 84ce2f2

File tree

1 file changed

+34
-26
lines changed

1 file changed

+34
-26
lines changed

lightning/src/util/sweep.rs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,10 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state =
386-
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
385+
let sweeper_state = Mutex::new(SweeperState {
386+
persistent: PersistentSweeperState { outputs, best_block },
387+
dirty: false,
388+
});
387389
Self {
388390
sweeper_state,
389391
pending_sweep: AtomicBool::new(false),
@@ -447,7 +449,10 @@ where
447449
}
448450
self.persist_state(&state_lock.persistent).map_err(|e| {
449451
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
450-
})
452+
})?;
453+
state_lock.dirty = false;
454+
455+
Ok(())
451456
}
452457

453458
/// Returns a list of the currently tracked spendable outputs.
@@ -504,12 +509,20 @@ where
504509

505510
// See if there is anything to sweep before requesting a change address.
506511
{
507-
let sweeper_state = self.sweeper_state.lock().unwrap();
512+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
508513

509514
let cur_height = sweeper_state.persistent.best_block.height;
510515
let has_respends =
511516
sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height));
512517
if !has_respends {
518+
// If there is nothing to sweep, we still persist the state if it is dirty.
519+
if sweeper_state.dirty {
520+
self.persist_state(&sweeper_state.persistent).map_err(|e| {
521+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
522+
})?;
523+
sweeper_state.dirty = false;
524+
}
525+
513526
return Ok(());
514527
}
515528
}
@@ -534,7 +547,8 @@ where
534547
.collect();
535548

536549
if respend_descriptors.is_empty() {
537-
// It could be that a tx confirmed and there is now nothing to sweep anymore.
550+
// It could be that a tx confirmed and there is now nothing to sweep anymore. If there is dirty state,
551+
// we'll persist it in the next cycle.
538552
return Ok(());
539553
}
540554

@@ -570,6 +584,7 @@ where
570584
self.persist_state(&sweeper_state.persistent).map_err(|e| {
571585
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
572586
})?;
587+
sweeper_state.dirty = false;
573588

574589
self.broadcaster.broadcast_transactions(&[&spending_tx]);
575590
}
@@ -595,6 +610,8 @@ where
595610
}
596611
true
597612
});
613+
614+
sweeper_state.dirty = true;
598615
}
599616

600617
fn persist_state(&self, sweeper_state: &PersistentSweeperState) -> Result<(), io::Error> {
@@ -648,13 +665,17 @@ where
648665
}
649666
}
650667
}
668+
669+
sweeper_state.dirty = true;
651670
}
652671

653672
fn best_block_updated_internal(
654673
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
655674
) {
656675
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
657676
self.prune_confirmed_outputs(sweeper_state);
677+
678+
sweeper_state.dirty = true;
658679
}
659680
}
660681

@@ -678,12 +699,8 @@ where
678699
assert_eq!(state_lock.persistent.best_block.height, height - 1,
679700
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
680701

681-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
682-
self.best_block_updated_internal(&mut *state_lock, header, height);
683-
684-
let _ = self.persist_state(&state_lock.persistent).map_err(|e| {
685-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
686-
});
702+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
703+
self.best_block_updated_internal(&mut state_lock, header, height);
687704
}
688705

689706
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -705,9 +722,7 @@ where
705722
}
706723
}
707724

708-
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
709-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
710-
});
725+
state_lock.dirty = true;
711726
}
712727
}
713728

@@ -727,9 +742,6 @@ where
727742
) {
728743
let mut state_lock = self.sweeper_state.lock().unwrap();
729744
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
730-
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
731-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
732-
});
733745
}
734746

735747
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -752,18 +764,13 @@ where
752764
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
753765
.for_each(|o| o.status.unconfirmed());
754766

755-
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
756-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
757-
});
767+
state_lock.dirty = true;
758768
}
759769
}
760770

761771
fn best_block_updated(&self, header: &Header, height: u32) {
762772
let mut state_lock = self.sweeper_state.lock().unwrap();
763-
self.best_block_updated_internal(&mut *state_lock, header, height);
764-
let _ = self.persist_state(&state_lock.persistent).map_err(|e| {
765-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
766-
});
773+
self.best_block_updated_internal(&mut state_lock, header, height);
767774
}
768775

769776
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -792,6 +799,7 @@ where
792799
#[derive(Debug)]
793800
struct SweeperState {
794801
persistent: PersistentSweeperState,
802+
dirty: bool,
795803
}
796804

797805
#[derive(Debug, Clone)]
@@ -856,7 +864,7 @@ where
856864
}
857865
}
858866

859-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
867+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
860868
Ok(Self {
861869
sweeper_state,
862870
pending_sweep: AtomicBool::new(false),
@@ -905,7 +913,7 @@ where
905913
}
906914
}
907915

908-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
916+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
909917
Ok((
910918
best_block,
911919
OutputSweeper {

0 commit comments

Comments
 (0)