Skip to content

Commit adb206a

Browse files
committed
Separate sweeper persistent state
Prepare for adding runtime state.
1 parent 0848e7a commit adb206a

File tree

1 file changed

+49
-30
lines changed

1 file changed

+49
-30
lines changed

lightning/src/util/sweep.rs

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,8 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
385+
let sweeper_state =
386+
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
386387
Self {
387388
sweeper_state,
388389
pending_sweep: AtomicBool::new(false),
@@ -437,27 +438,32 @@ where
437438
},
438439
};
439440

440-
if state_lock.outputs.iter().find(|o| o.descriptor == output_info.descriptor).is_some()
441+
if state_lock
442+
.persistent
443+
.outputs
444+
.iter()
445+
.find(|o| o.descriptor == output_info.descriptor)
446+
.is_some()
441447
{
442448
continue;
443449
}
444450

445-
state_lock.outputs.push(output_info);
451+
state_lock.persistent.outputs.push(output_info);
446452
}
447-
self.persist_state(&*state_lock).map_err(|e| {
453+
self.persist_state(&state_lock).map_err(|e| {
448454
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
449455
})
450456
}
451457

452458
/// Returns a list of the currently tracked spendable outputs.
453459
pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
454-
self.sweeper_state.lock().unwrap().outputs.clone()
460+
self.sweeper_state.lock().unwrap().persistent.outputs.clone()
455461
}
456462

457463
/// Gets the latest best block which was connected either via the [`Listen`] or
458464
/// [`Confirm`] interfaces.
459465
pub fn current_best_block(&self) -> BestBlock {
460-
self.sweeper_state.lock().unwrap().best_block
466+
self.sweeper_state.lock().unwrap().persistent.best_block
461467
}
462468

463469
/// Regenerates and broadcasts the spending transaction for any outputs that are pending. This method will be a
@@ -503,7 +509,7 @@ where
503509

504510
// See if there is anything to sweep before requesting a change address.
505511
{
506-
let sweeper_state = self.sweeper_state.lock().unwrap();
512+
let sweeper_state = &self.sweeper_state.lock().unwrap().persistent;
507513

508514
let cur_height = sweeper_state.best_block.height;
509515
let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
@@ -518,12 +524,13 @@ where
518524

519525
// Sweep the outputs.
520526
{
521-
let mut sweeper_state = self.sweeper_state.lock().unwrap();
527+
let mut sweeper_state = &mut self.sweeper_state.lock().unwrap();
522528

523-
let cur_height = sweeper_state.best_block.height;
524-
let cur_hash = sweeper_state.best_block.block_hash;
529+
let cur_height = sweeper_state.persistent.best_block.height;
530+
let cur_hash = sweeper_state.persistent.best_block.block_hash;
525531

526532
let respend_descriptors: Vec<&SpendableOutputDescriptor> = sweeper_state
533+
.persistent
527534
.outputs
528535
.iter()
529536
.filter(|o| filter_fn(*o, cur_height))
@@ -536,7 +543,11 @@ where
536543
}
537544

538545
let spending_tx = self
539-
.spend_outputs(&sweeper_state, &respend_descriptors, change_destination_script)
546+
.spend_outputs(
547+
&sweeper_state.persistent,
548+
&respend_descriptors,
549+
change_destination_script,
550+
)
540551
.map_err(|e| {
541552
log_error!(self.logger, "Error spending outputs: {:?}", e);
542553
})?;
@@ -550,7 +561,7 @@ where
550561
// As we didn't modify the state so far, the same filter_fn yields the same elements as
551562
// above.
552563
let respend_outputs =
553-
sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
564+
sweeper_state.persistent.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
554565
for output_info in respend_outputs {
555566
if let Some(filter) = self.chain_data_source.as_ref() {
556567
let watched_output = output_info.to_watched_output(cur_hash);
@@ -571,10 +582,10 @@ where
571582
}
572583

573584
fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
574-
let cur_height = sweeper_state.best_block.height;
585+
let cur_height = sweeper_state.persistent.best_block.height;
575586

576587
// Prune all outputs that have sufficient depth by now.
577-
sweeper_state.outputs.retain(|o| {
588+
sweeper_state.persistent.outputs.retain(|o| {
578589
if let Some(confirmation_height) = o.status.confirmation_height() {
579590
// We wait at least `PRUNE_DELAY_BLOCKS` as before that
580591
// `Event::SpendableOutputs` from lingering monitors might get replayed.
@@ -596,7 +607,7 @@ where
596607
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
597608
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
598609
OUTPUT_SWEEPER_PERSISTENCE_KEY,
599-
&sweeper_state.encode(),
610+
&sweeper_state.persistent.encode(),
600611
)
601612
.map_err(|e| {
602613
log_error!(
@@ -612,7 +623,7 @@ where
612623
}
613624

614625
fn spend_outputs(
615-
&self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor],
626+
&self, sweeper_state: &PersistentSweeperState, descriptors: &[&SpendableOutputDescriptor],
616627
change_destination_script: ScriptBuf,
617628
) -> Result<Transaction, ()> {
618629
let tx_feerate =
@@ -635,7 +646,7 @@ where
635646
) {
636647
let confirmation_hash = header.block_hash();
637648
for (_, tx) in txdata {
638-
for output_info in sweeper_state.outputs.iter_mut() {
649+
for output_info in sweeper_state.persistent.outputs.iter_mut() {
639650
if output_info.is_spent_in(*tx) {
640651
output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
641652
}
@@ -646,7 +657,7 @@ where
646657
fn best_block_updated_internal(
647658
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
648659
) {
649-
sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
660+
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
650661
self.prune_confirmed_outputs(sweeper_state);
651662
}
652663
}
@@ -666,9 +677,9 @@ where
666677
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
667678
) {
668679
let mut state_lock = self.sweeper_state.lock().unwrap();
669-
assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
680+
assert_eq!(state_lock.persistent.best_block.block_hash, header.prev_blockhash,
670681
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
671-
assert_eq!(state_lock.best_block.height, height - 1,
682+
assert_eq!(state_lock.persistent.best_block.height, height - 1,
672683
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
673684

674685
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
@@ -685,13 +696,13 @@ where
685696
let new_height = height - 1;
686697
let block_hash = header.block_hash();
687698

688-
assert_eq!(state_lock.best_block.block_hash, block_hash,
699+
assert_eq!(state_lock.persistent.best_block.block_hash, block_hash,
689700
"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
690-
assert_eq!(state_lock.best_block.height, height,
701+
assert_eq!(state_lock.persistent.best_block.height, height,
691702
"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
692-
state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height);
703+
state_lock.persistent.best_block = BestBlock::new(header.prev_blockhash, new_height);
693704

694-
for output_info in state_lock.outputs.iter_mut() {
705+
for output_info in state_lock.persistent.outputs.iter_mut() {
695706
if output_info.status.confirmation_hash() == Some(block_hash) {
696707
debug_assert_eq!(output_info.status.confirmation_height(), Some(height));
697708
output_info.status.unconfirmed();
@@ -730,6 +741,7 @@ where
730741

731742
// Get what height was unconfirmed.
732743
let unconf_height = state_lock
744+
.persistent
733745
.outputs
734746
.iter()
735747
.find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
@@ -738,6 +750,7 @@ where
738750
if let Some(unconf_height) = unconf_height {
739751
// Unconfirm all >= this height.
740752
state_lock
753+
.persistent
741754
.outputs
742755
.iter_mut()
743756
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
@@ -760,6 +773,7 @@ where
760773
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
761774
let state_lock = self.sweeper_state.lock().unwrap();
762775
state_lock
776+
.persistent
763777
.outputs
764778
.iter()
765779
.filter_map(|o| match o.status {
@@ -779,13 +793,18 @@ where
779793
}
780794
}
781795

782-
#[derive(Debug, Clone)]
796+
#[derive(Debug)]
783797
struct SweeperState {
798+
persistent: PersistentSweeperState,
799+
}
800+
801+
#[derive(Debug, Clone)]
802+
struct PersistentSweeperState {
784803
outputs: Vec<TrackedSpendableOutput>,
785804
best_block: BestBlock,
786805
}
787806

788-
impl_writeable_tlv_based!(SweeperState, {
807+
impl_writeable_tlv_based!(PersistentSweeperState, {
789808
(0, outputs, required_vec),
790809
(2, best_block, required),
791810
});
@@ -831,7 +850,7 @@ where
831850
kv_store,
832851
logger,
833852
) = args;
834-
let state = SweeperState::read(reader)?;
853+
let state = PersistentSweeperState::read(reader)?;
835854
let best_block = state.best_block;
836855

837856
if let Some(filter) = chain_data_source.as_ref() {
@@ -841,7 +860,7 @@ where
841860
}
842861
}
843862

844-
let sweeper_state = Mutex::new(state);
863+
let sweeper_state = Mutex::new(SweeperState { persistent: state });
845864
Ok(Self {
846865
sweeper_state,
847866
pending_sweep: AtomicBool::new(false),
@@ -880,7 +899,7 @@ where
880899
kv_store,
881900
logger,
882901
) = args;
883-
let state = SweeperState::read(reader)?;
902+
let state = PersistentSweeperState::read(reader)?;
884903
let best_block = state.best_block;
885904

886905
if let Some(filter) = chain_data_source.as_ref() {
@@ -890,7 +909,7 @@ where
890909
}
891910
}
892911

893-
let sweeper_state = Mutex::new(state);
912+
let sweeper_state = Mutex::new(SweeperState { persistent: state });
894913
Ok((
895914
best_block,
896915
OutputSweeper {

0 commit comments

Comments
 (0)