Skip to content

Commit 67d9f8f

Browse files
committed
Persist sweeper state as part of background process
To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart.
1 parent 8e3264c commit 67d9f8f

File tree

1 file changed

+36
-31
lines changed

1 file changed

+36
-31
lines changed

lightning/src/util/sweep.rs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,10 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state =
386-
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
385+
let sweeper_state = Mutex::new(SweeperState {
386+
persistent: PersistentSweeperState { outputs, best_block },
387+
dirty: false,
388+
});
387389
Self {
388390
sweeper_state,
389391
pending_sweep: AtomicBool::new(false),
@@ -445,9 +447,9 @@ where
445447

446448
state_lock.persistent.outputs.push(output_info);
447449
}
448-
self.persist_state(&state_lock).map_err(|e| {
449-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
450-
})
450+
self.persist_state(&state_lock.persistent)
451+
.inspect(|_| state_lock.dirty = false)
452+
.map_err(|e| log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e))
451453
}
452454

453455
/// Returns a list of the currently tracked spendable outputs.
@@ -504,12 +506,21 @@ where
504506

505507
// See if there is anything to sweep before requesting a change address.
506508
{
507-
let sweeper_state = self.sweeper_state.lock().unwrap();
509+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
508510

509511
let cur_height = sweeper_state.persistent.best_block.height;
510512
let has_respends =
511513
sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height));
512514
if !has_respends {
515+
// If there is nothing to sweep, we still persist the state if it is dirty.
516+
if sweeper_state.dirty {
517+
self.persist_state(&sweeper_state.persistent)
518+
.inspect(|_| sweeper_state.dirty = false)
519+
.map_err(|e| {
520+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e)
521+
})?;
522+
}
523+
513524
return Ok(());
514525
}
515526
}
@@ -534,7 +545,8 @@ where
534545
.collect();
535546

536547
if respend_descriptors.is_empty() {
537-
// It could be that a tx confirmed and there is now nothing to sweep anymore.
548+
// It could be that a tx confirmed and there is now nothing to sweep anymore. If there is dirty state,
549+
// we'll persist it in the next cycle.
538550
return Ok(());
539551
}
540552

@@ -567,9 +579,9 @@ where
567579
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
568580
}
569581

570-
self.persist_state(&sweeper_state).map_err(|e| {
571-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
572-
})?;
582+
self.persist_state(&sweeper_state.persistent)
583+
.inspect(|_| sweeper_state.dirty = false)
584+
.map_err(|e| log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e))?;
573585

574586
self.broadcaster.broadcast_transactions(&[&spending_tx]);
575587
}
@@ -595,6 +607,8 @@ where
595607
}
596608
true
597609
});
610+
611+
sweeper_state.dirty = true;
598612
}
599613

600614
fn persist_state(&self, sweeper_state: &PersistentSweeperState) -> Result<(), io::Error> {
@@ -648,13 +662,17 @@ where
648662
}
649663
}
650664
}
665+
666+
sweeper_state.dirty = true;
651667
}
652668

653669
fn best_block_updated_internal(
654670
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
655671
) {
656672
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
657673
self.prune_confirmed_outputs(sweeper_state);
674+
675+
sweeper_state.dirty = true;
658676
}
659677
}
660678

@@ -678,12 +696,8 @@ where
678696
assert_eq!(state_lock.persistent.best_block.height, height - 1,
679697
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
680698

681-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
682-
self.best_block_updated_internal(&mut *state_lock, header, height);
683-
684-
let _ = self.persist_state(&*state_lock).map_err(|e| {
685-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
686-
});
699+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
700+
self.best_block_updated_internal(&mut state_lock, header, height);
687701
}
688702

689703
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -705,9 +719,7 @@ where
705719
}
706720
}
707721

708-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
709-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
710-
});
722+
state_lock.dirty = true;
711723
}
712724
}
713725

@@ -727,9 +739,6 @@ where
727739
) {
728740
let mut state_lock = self.sweeper_state.lock().unwrap();
729741
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
730-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
731-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
732-
});
733742
}
734743

735744
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -752,18 +761,13 @@ where
752761
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
753762
.for_each(|o| o.status.unconfirmed());
754763

755-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
756-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
757-
});
764+
state_lock.dirty = true;
758765
}
759766
}
760767

761768
fn best_block_updated(&self, header: &Header, height: u32) {
762769
let mut state_lock = self.sweeper_state.lock().unwrap();
763-
self.best_block_updated_internal(&mut *state_lock, header, height);
764-
let _ = self.persist_state(&*state_lock).map_err(|e| {
765-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
766-
});
770+
self.best_block_updated_internal(&mut state_lock, header, height);
767771
}
768772

769773
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -792,6 +796,7 @@ where
792796
#[derive(Debug)]
793797
struct SweeperState {
794798
persistent: PersistentSweeperState,
799+
dirty: bool,
795800
}
796801

797802
#[derive(Debug, Clone)]
@@ -856,7 +861,7 @@ where
856861
}
857862
}
858863

859-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
864+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
860865
Ok(Self {
861866
sweeper_state,
862867
pending_sweep: AtomicBool::new(false),
@@ -905,7 +910,7 @@ where
905910
}
906911
}
907912

908-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
913+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
909914
Ok((
910915
best_block,
911916
OutputSweeper {

0 commit comments

Comments
 (0)