Skip to content

Commit 6d52f3a

Browse files
committed
Separate sweeper persistent state
Prepare for adding runtime state.
1 parent 0848e7a commit 6d52f3a

File tree

1 file changed

+50
-30
lines changed

1 file changed

+50
-30
lines changed

lightning/src/util/sweep.rs

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,8 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
385+
let sweeper_state =
386+
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
386387
Self {
387388
sweeper_state,
388389
pending_sweep: AtomicBool::new(false),
@@ -437,27 +438,32 @@ where
437438
},
438439
};
439440

440-
if state_lock.outputs.iter().find(|o| o.descriptor == output_info.descriptor).is_some()
441+
if state_lock
442+
.persistent
443+
.outputs
444+
.iter()
445+
.find(|o| o.descriptor == output_info.descriptor)
446+
.is_some()
441447
{
442448
continue;
443449
}
444450

445-
state_lock.outputs.push(output_info);
451+
state_lock.persistent.outputs.push(output_info);
446452
}
447-
self.persist_state(&*state_lock).map_err(|e| {
453+
self.persist_state(&state_lock).map_err(|e| {
448454
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
449455
})
450456
}
451457

452458
/// Returns a list of the currently tracked spendable outputs.
453459
pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
454-
self.sweeper_state.lock().unwrap().outputs.clone()
460+
self.sweeper_state.lock().unwrap().persistent.outputs.clone()
455461
}
456462

457463
/// Gets the latest best block which was connected either via the [`Listen`] or
458464
/// [`Confirm`] interfaces.
459465
pub fn current_best_block(&self) -> BestBlock {
460-
self.sweeper_state.lock().unwrap().best_block
466+
self.sweeper_state.lock().unwrap().persistent.best_block
461467
}
462468

463469
/// Regenerates and broadcasts the spending transaction for any outputs that are pending. This method will be a
@@ -505,8 +511,9 @@ where
505511
{
506512
let sweeper_state = self.sweeper_state.lock().unwrap();
507513

508-
let cur_height = sweeper_state.best_block.height;
509-
let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
514+
let cur_height = sweeper_state.persistent.best_block.height;
515+
let has_respends =
516+
sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height));
510517
if !has_respends {
511518
return Ok(());
512519
}
@@ -520,10 +527,11 @@ where
520527
{
521528
let mut sweeper_state = self.sweeper_state.lock().unwrap();
522529

523-
let cur_height = sweeper_state.best_block.height;
524-
let cur_hash = sweeper_state.best_block.block_hash;
530+
let cur_height = sweeper_state.persistent.best_block.height;
531+
let cur_hash = sweeper_state.persistent.best_block.block_hash;
525532

526533
let respend_descriptors: Vec<&SpendableOutputDescriptor> = sweeper_state
534+
.persistent
527535
.outputs
528536
.iter()
529537
.filter(|o| filter_fn(*o, cur_height))
@@ -536,7 +544,11 @@ where
536544
}
537545

538546
let spending_tx = self
539-
.spend_outputs(&sweeper_state, &respend_descriptors, change_destination_script)
547+
.spend_outputs(
548+
&sweeper_state.persistent,
549+
&respend_descriptors,
550+
change_destination_script,
551+
)
540552
.map_err(|e| {
541553
log_error!(self.logger, "Error spending outputs: {:?}", e);
542554
})?;
@@ -550,7 +562,7 @@ where
550562
// As we didn't modify the state so far, the same filter_fn yields the same elements as
551563
// above.
552564
let respend_outputs =
553-
sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
565+
sweeper_state.persistent.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
554566
for output_info in respend_outputs {
555567
if let Some(filter) = self.chain_data_source.as_ref() {
556568
let watched_output = output_info.to_watched_output(cur_hash);
@@ -571,10 +583,10 @@ where
571583
}
572584

573585
fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
574-
let cur_height = sweeper_state.best_block.height;
586+
let cur_height = sweeper_state.persistent.best_block.height;
575587

576588
// Prune all outputs that have sufficient depth by now.
577-
sweeper_state.outputs.retain(|o| {
589+
sweeper_state.persistent.outputs.retain(|o| {
578590
if let Some(confirmation_height) = o.status.confirmation_height() {
579591
// We wait at least `PRUNE_DELAY_BLOCKS` as before that
580592
// `Event::SpendableOutputs` from lingering monitors might get replayed.
@@ -596,7 +608,7 @@ where
596608
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
597609
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
598610
OUTPUT_SWEEPER_PERSISTENCE_KEY,
599-
&sweeper_state.encode(),
611+
&sweeper_state.persistent.encode(),
600612
)
601613
.map_err(|e| {
602614
log_error!(
@@ -612,7 +624,7 @@ where
612624
}
613625

614626
fn spend_outputs(
615-
&self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor],
627+
&self, sweeper_state: &PersistentSweeperState, descriptors: &[&SpendableOutputDescriptor],
616628
change_destination_script: ScriptBuf,
617629
) -> Result<Transaction, ()> {
618630
let tx_feerate =
@@ -635,7 +647,7 @@ where
635647
) {
636648
let confirmation_hash = header.block_hash();
637649
for (_, tx) in txdata {
638-
for output_info in sweeper_state.outputs.iter_mut() {
650+
for output_info in sweeper_state.persistent.outputs.iter_mut() {
639651
if output_info.is_spent_in(*tx) {
640652
output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
641653
}
@@ -646,7 +658,7 @@ where
646658
fn best_block_updated_internal(
647659
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
648660
) {
649-
sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
661+
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
650662
self.prune_confirmed_outputs(sweeper_state);
651663
}
652664
}
@@ -666,9 +678,9 @@ where
666678
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
667679
) {
668680
let mut state_lock = self.sweeper_state.lock().unwrap();
669-
assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
681+
assert_eq!(state_lock.persistent.best_block.block_hash, header.prev_blockhash,
670682
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
671-
assert_eq!(state_lock.best_block.height, height - 1,
683+
assert_eq!(state_lock.persistent.best_block.height, height - 1,
672684
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
673685

674686
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
@@ -685,13 +697,13 @@ where
685697
let new_height = height - 1;
686698
let block_hash = header.block_hash();
687699

688-
assert_eq!(state_lock.best_block.block_hash, block_hash,
700+
assert_eq!(state_lock.persistent.best_block.block_hash, block_hash,
689701
"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
690-
assert_eq!(state_lock.best_block.height, height,
702+
assert_eq!(state_lock.persistent.best_block.height, height,
691703
"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
692-
state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height);
704+
state_lock.persistent.best_block = BestBlock::new(header.prev_blockhash, new_height);
693705

694-
for output_info in state_lock.outputs.iter_mut() {
706+
for output_info in state_lock.persistent.outputs.iter_mut() {
695707
if output_info.status.confirmation_hash() == Some(block_hash) {
696708
debug_assert_eq!(output_info.status.confirmation_height(), Some(height));
697709
output_info.status.unconfirmed();
@@ -730,6 +742,7 @@ where
730742

731743
// Get what height was unconfirmed.
732744
let unconf_height = state_lock
745+
.persistent
733746
.outputs
734747
.iter()
735748
.find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
@@ -738,6 +751,7 @@ where
738751
if let Some(unconf_height) = unconf_height {
739752
// Unconfirm all >= this height.
740753
state_lock
754+
.persistent
741755
.outputs
742756
.iter_mut()
743757
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
@@ -760,6 +774,7 @@ where
760774
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
761775
let state_lock = self.sweeper_state.lock().unwrap();
762776
state_lock
777+
.persistent
763778
.outputs
764779
.iter()
765780
.filter_map(|o| match o.status {
@@ -779,13 +794,18 @@ where
779794
}
780795
}
781796

782-
#[derive(Debug, Clone)]
797+
#[derive(Debug)]
783798
struct SweeperState {
799+
persistent: PersistentSweeperState,
800+
}
801+
802+
#[derive(Debug, Clone)]
803+
struct PersistentSweeperState {
784804
outputs: Vec<TrackedSpendableOutput>,
785805
best_block: BestBlock,
786806
}
787807

788-
impl_writeable_tlv_based!(SweeperState, {
808+
impl_writeable_tlv_based!(PersistentSweeperState, {
789809
(0, outputs, required_vec),
790810
(2, best_block, required),
791811
});
@@ -831,7 +851,7 @@ where
831851
kv_store,
832852
logger,
833853
) = args;
834-
let state = SweeperState::read(reader)?;
854+
let state = PersistentSweeperState::read(reader)?;
835855
let best_block = state.best_block;
836856

837857
if let Some(filter) = chain_data_source.as_ref() {
@@ -841,7 +861,7 @@ where
841861
}
842862
}
843863

844-
let sweeper_state = Mutex::new(state);
864+
let sweeper_state = Mutex::new(SweeperState { persistent: state });
845865
Ok(Self {
846866
sweeper_state,
847867
pending_sweep: AtomicBool::new(false),
@@ -880,7 +900,7 @@ where
880900
kv_store,
881901
logger,
882902
) = args;
883-
let state = SweeperState::read(reader)?;
903+
let state = PersistentSweeperState::read(reader)?;
884904
let best_block = state.best_block;
885905

886906
if let Some(filter) = chain_data_source.as_ref() {
@@ -890,7 +910,7 @@ where
890910
}
891911
}
892912

893-
let sweeper_state = Mutex::new(state);
913+
let sweeper_state = Mutex::new(SweeperState { persistent: state });
894914
Ok((
895915
best_block,
896916
OutputSweeper {

0 commit comments

Comments
 (0)