Skip to content

Commit f71c795

Browse files
committed
Persist sweeper state as part of background process
To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart.
1 parent fd03066 commit f71c795

File tree

1 file changed

+38
-27
lines changed

1 file changed

+38
-27
lines changed

lightning/src/util/sweep.rs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,10 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state =
386-
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
385+
let sweeper_state = Mutex::new(SweeperState {
386+
persistent: PersistentSweeperState { outputs, best_block },
387+
dirty: false,
388+
});
387389
Self {
388390
sweeper_state,
389391
pending_sweep: AtomicBool::new(false),
@@ -445,7 +447,7 @@ where
445447

446448
state_lock.persistent.outputs.push(output_info);
447449
}
448-
self.persist_state(&state_lock).map_err(|e| {
450+
self.flush_state(&mut state_lock).map_err(|e| {
449451
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
450452
})
451453
}
@@ -473,7 +475,19 @@ where
473475
return Ok(());
474476
}
475477

476-
let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
478+
let result = {
479+
self.regenerate_and_broadcast_spend_if_necessary_internal().await?;
480+
481+
// If there is still dirty state, we need to persist it.
482+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
483+
if sweeper_state.dirty {
484+
self.flush_state(&mut sweeper_state).map_err(|e| {
485+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
486+
})
487+
} else {
488+
Ok(())
489+
}
490+
};
477491

478492
// Release the pending sweep flag again, regardless of result.
479493
self.pending_sweep.store(false, Ordering::Release);
@@ -567,7 +581,7 @@ where
567581
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
568582
}
569583

570-
self.persist_state(&sweeper_state).map_err(|e| {
584+
self.flush_state(&mut sweeper_state).map_err(|e| {
571585
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
572586
})?;
573587

@@ -595,9 +609,12 @@ where
595609
}
596610
true
597611
});
612+
613+
sweeper_state.dirty = true;
598614
}
599615

600-
fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
616+
/// Flushes the current state to the persistence layer and marks the state as clean.
617+
fn flush_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> {
601618
self.kv_store
602619
.write(
603620
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -616,6 +633,9 @@ where
616633
);
617634
e
618635
})
636+
.map(|_| {
637+
sweeper_state.dirty = false;
638+
})
619639
}
620640

621641
fn spend_outputs(
@@ -648,13 +668,17 @@ where
648668
}
649669
}
650670
}
671+
672+
sweeper_state.dirty = true;
651673
}
652674

653675
fn best_block_updated_internal(
654676
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
655677
) {
656678
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
657679
self.prune_confirmed_outputs(sweeper_state);
680+
681+
sweeper_state.dirty = true;
658682
}
659683
}
660684

@@ -678,12 +702,8 @@ where
678702
assert_eq!(state_lock.persistent.best_block.height, height - 1,
679703
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
680704

681-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
682-
self.best_block_updated_internal(&mut *state_lock, header, height);
683-
684-
let _ = self.persist_state(&*state_lock).map_err(|e| {
685-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
686-
});
705+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
706+
self.best_block_updated_internal(&mut state_lock, header, height);
687707
}
688708

689709
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -705,9 +725,7 @@ where
705725
}
706726
}
707727

708-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
709-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
710-
});
728+
state_lock.dirty = true;
711729
}
712730
}
713731

@@ -727,9 +745,6 @@ where
727745
) {
728746
let mut state_lock = self.sweeper_state.lock().unwrap();
729747
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
730-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
731-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
732-
});
733748
}
734749

735750
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -752,18 +767,13 @@ where
752767
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
753768
.for_each(|o| o.status.unconfirmed());
754769

755-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
756-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
757-
});
770+
state_lock.dirty = true;
758771
}
759772
}
760773

761774
fn best_block_updated(&self, header: &Header, height: u32) {
762775
let mut state_lock = self.sweeper_state.lock().unwrap();
763-
self.best_block_updated_internal(&mut *state_lock, header, height);
764-
let _ = self.persist_state(&*state_lock).map_err(|e| {
765-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
766-
});
776+
self.best_block_updated_internal(&mut state_lock, header, height);
767777
}
768778

769779
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -792,6 +802,7 @@ where
792802
#[derive(Debug)]
793803
struct SweeperState {
794804
persistent: PersistentSweeperState,
805+
dirty: bool,
795806
}
796807

797808
#[derive(Debug, Clone)]
@@ -856,7 +867,7 @@ where
856867
}
857868
}
858869

859-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
870+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
860871
Ok(Self {
861872
sweeper_state,
862873
pending_sweep: AtomicBool::new(false),
@@ -905,7 +916,7 @@ where
905916
}
906917
}
907918

908-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
919+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
909920
Ok((
910921
best_block,
911922
OutputSweeper {

0 commit comments

Comments
 (0)