Skip to content

Commit 36ab2ee

Browse files
committed
Persist sweeper state as part of background process
To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart.
1 parent f40326f commit 36ab2ee

File tree

1 file changed

+40
-31
lines changed

1 file changed

+40
-31
lines changed

lightning/src/util/sweep.rs

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,10 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state =
386-
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
385+
let sweeper_state = Mutex::new(SweeperState {
386+
persistent: PersistentSweeperState { outputs, best_block },
387+
dirty: false,
388+
});
387389
Self {
388390
sweeper_state,
389391
pending_sweep: AtomicBool::new(false),
@@ -445,9 +447,11 @@ where
445447

446448
state_lock.persistent.outputs.push(output_info);
447449
}
448-
self.persist_state(&state_lock.persistent).map_err(|e| {
449-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
450-
})
450+
self.persist_state(&state_lock.persistent).inspect(|_| state_lock.dirty = false).map_err(
451+
|e| {
452+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
453+
},
454+
)
451455
}
452456

453457
/// Returns a list of the currently tracked spendable outputs.
@@ -504,12 +508,21 @@ where
504508

505509
// See if there is anything to sweep before requesting a change address.
506510
{
507-
let sweeper_state = self.sweeper_state.lock().unwrap();
511+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
508512

509513
let cur_height = sweeper_state.persistent.best_block.height;
510514
let has_respends =
511515
sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height));
512516
if !has_respends {
517+
// If there is nothing to sweep, we still persist the state if it is dirty.
518+
if sweeper_state.dirty {
519+
self.persist_state(&sweeper_state.persistent)
520+
.inspect(|_| sweeper_state.dirty = false)
521+
.map_err(|e| {
522+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
523+
})?;
524+
}
525+
513526
return Ok(());
514527
}
515528
}
@@ -534,7 +547,8 @@ where
534547
.collect();
535548

536549
if respend_descriptors.is_empty() {
537-
// It could be that a tx confirmed and there is now nothing to sweep anymore.
550+
// It could be that a tx confirmed and there is now nothing to sweep anymore. If there is dirty state,
551+
// we'll persist it in the next cycle.
538552
return Ok(());
539553
}
540554

@@ -567,9 +581,11 @@ where
567581
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
568582
}
569583

570-
self.persist_state(&sweeper_state.persistent).map_err(|e| {
571-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
572-
})?;
584+
self.persist_state(&sweeper_state.persistent)
585+
.inspect(|_| sweeper_state.dirty = false)
586+
.map_err(|e| {
587+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
588+
})?;
573589

574590
self.broadcaster.broadcast_transactions(&[&spending_tx]);
575591
}
@@ -595,6 +611,8 @@ where
595611
}
596612
true
597613
});
614+
615+
sweeper_state.dirty = true;
598616
}
599617

600618
fn persist_state(&self, sweeper_state: &PersistentSweeperState) -> Result<(), io::Error> {
@@ -648,13 +666,17 @@ where
648666
}
649667
}
650668
}
669+
670+
sweeper_state.dirty = true;
651671
}
652672

653673
fn best_block_updated_internal(
654674
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
655675
) {
656676
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
657677
self.prune_confirmed_outputs(sweeper_state);
678+
679+
sweeper_state.dirty = true;
658680
}
659681
}
660682

@@ -678,12 +700,8 @@ where
678700
assert_eq!(state_lock.persistent.best_block.height, height - 1,
679701
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
680702

681-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
682-
self.best_block_updated_internal(&mut *state_lock, header, height);
683-
684-
let _ = self.persist_state(&state_lock.persistent).map_err(|e| {
685-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
686-
});
703+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
704+
self.best_block_updated_internal(&mut state_lock, header, height);
687705
}
688706

689707
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -705,9 +723,7 @@ where
705723
}
706724
}
707725

708-
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
709-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
710-
});
726+
state_lock.dirty = true;
711727
}
712728
}
713729

@@ -727,9 +743,6 @@ where
727743
) {
728744
let mut state_lock = self.sweeper_state.lock().unwrap();
729745
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
730-
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
731-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
732-
});
733746
}
734747

735748
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -752,18 +765,13 @@ where
752765
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
753766
.for_each(|o| o.status.unconfirmed());
754767

755-
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
756-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
757-
});
768+
state_lock.dirty = true;
758769
}
759770
}
760771

761772
fn best_block_updated(&self, header: &Header, height: u32) {
762773
let mut state_lock = self.sweeper_state.lock().unwrap();
763-
self.best_block_updated_internal(&mut *state_lock, header, height);
764-
let _ = self.persist_state(&state_lock.persistent).map_err(|e| {
765-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
766-
});
774+
self.best_block_updated_internal(&mut state_lock, header, height);
767775
}
768776

769777
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -792,6 +800,7 @@ where
792800
#[derive(Debug)]
793801
struct SweeperState {
794802
persistent: PersistentSweeperState,
803+
dirty: bool,
795804
}
796805

797806
#[derive(Debug, Clone)]
@@ -856,7 +865,7 @@ where
856865
}
857866
}
858867

859-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
868+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
860869
Ok(Self {
861870
sweeper_state,
862871
pending_sweep: AtomicBool::new(false),
@@ -905,7 +914,7 @@ where
905914
}
906915
}
907916

908-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
917+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
909918
Ok((
910919
best_block,
911920
OutputSweeper {

0 commit comments

Comments
 (0)