Skip to content

Commit c09d41c

Browse files
committed
Separate sweeper persistent state
Prepare for adding runtime state while avoiding the _unused serialization macro config.
1 parent afe4285 commit c09d41c

File tree

1 file changed

+52
-37
lines changed

1 file changed

+52
-37
lines changed

lightning/src/util/sweep.rs

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,8 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
385+
let sweeper_state =
386+
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
386387
Self {
387388
sweeper_state,
388389
pending_sweep: AtomicBool::new(false),
@@ -437,27 +438,27 @@ where
437438
},
438439
};
439440

440-
if state_lock.outputs.iter().find(|o| o.descriptor == output_info.descriptor).is_some()
441-
{
441+
let mut outputs = state_lock.persistent.outputs.iter();
442+
if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() {
442443
continue;
443444
}
444445

445-
state_lock.outputs.push(output_info);
446+
state_lock.persistent.outputs.push(output_info);
446447
}
447-
self.persist_state(&*state_lock).map_err(|e| {
448+
self.persist_state(&state_lock.persistent).map_err(|e| {
448449
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
449450
})
450451
}
451452

452453
/// Returns a list of the currently tracked spendable outputs.
453454
pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
454-
self.sweeper_state.lock().unwrap().outputs.clone()
455+
self.sweeper_state.lock().unwrap().persistent.outputs.clone()
455456
}
456457

457458
/// Gets the latest best block which was connected either via the [`Listen`] or
458459
/// [`Confirm`] interfaces.
459460
pub fn current_best_block(&self) -> BestBlock {
460-
self.sweeper_state.lock().unwrap().best_block
461+
self.sweeper_state.lock().unwrap().persistent.best_block
461462
}
462463

463464
/// Regenerates and broadcasts the spending transaction for any outputs that are pending. This method will be a
@@ -505,8 +506,9 @@ where
505506
{
506507
let sweeper_state = self.sweeper_state.lock().unwrap();
507508

508-
let cur_height = sweeper_state.best_block.height;
509-
let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
509+
let cur_height = sweeper_state.persistent.best_block.height;
510+
let has_respends =
511+
sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height));
510512
if !has_respends {
511513
return Ok(());
512514
}
@@ -520,10 +522,11 @@ where
520522
{
521523
let mut sweeper_state = self.sweeper_state.lock().unwrap();
522524

523-
let cur_height = sweeper_state.best_block.height;
524-
let cur_hash = sweeper_state.best_block.block_hash;
525+
let cur_height = sweeper_state.persistent.best_block.height;
526+
let cur_hash = sweeper_state.persistent.best_block.block_hash;
525527

526528
let respend_descriptors: Vec<&SpendableOutputDescriptor> = sweeper_state
529+
.persistent
527530
.outputs
528531
.iter()
529532
.filter(|o| filter_fn(*o, cur_height))
@@ -536,7 +539,11 @@ where
536539
}
537540

538541
let spending_tx = self
539-
.spend_outputs(&sweeper_state, &respend_descriptors, change_destination_script)
542+
.spend_outputs(
543+
&sweeper_state.persistent,
544+
&respend_descriptors,
545+
change_destination_script,
546+
)
540547
.map_err(|e| {
541548
log_error!(self.logger, "Error spending outputs: {:?}", e);
542549
})?;
@@ -550,7 +557,7 @@ where
550557
// As we didn't modify the state so far, the same filter_fn yields the same elements as
551558
// above.
552559
let respend_outputs =
553-
sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
560+
sweeper_state.persistent.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height));
554561
for output_info in respend_outputs {
555562
if let Some(filter) = self.chain_data_source.as_ref() {
556563
let watched_output = output_info.to_watched_output(cur_hash);
@@ -560,7 +567,7 @@ where
560567
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
561568
}
562569

563-
self.persist_state(&sweeper_state).map_err(|e| {
570+
self.persist_state(&sweeper_state.persistent).map_err(|e| {
564571
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
565572
})?;
566573

@@ -571,10 +578,10 @@ where
571578
}
572579

573580
fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
574-
let cur_height = sweeper_state.best_block.height;
581+
let cur_height = sweeper_state.persistent.best_block.height;
575582

576583
// Prune all outputs that have sufficient depth by now.
577-
sweeper_state.outputs.retain(|o| {
584+
sweeper_state.persistent.outputs.retain(|o| {
578585
if let Some(confirmation_height) = o.status.confirmation_height() {
579586
// We wait at least `PRUNE_DELAY_BLOCKS` as before that
580587
// `Event::SpendableOutputs` from lingering monitors might get replayed.
@@ -590,7 +597,7 @@ where
590597
});
591598
}
592599

593-
fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
600+
fn persist_state(&self, sweeper_state: &PersistentSweeperState) -> Result<(), io::Error> {
594601
self.kv_store
595602
.write(
596603
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -612,7 +619,7 @@ where
612619
}
613620

614621
fn spend_outputs(
615-
&self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor],
622+
&self, sweeper_state: &PersistentSweeperState, descriptors: &[&SpendableOutputDescriptor],
616623
change_destination_script: ScriptBuf,
617624
) -> Result<Transaction, ()> {
618625
let tx_feerate =
@@ -635,7 +642,7 @@ where
635642
) {
636643
let confirmation_hash = header.block_hash();
637644
for (_, tx) in txdata {
638-
for output_info in sweeper_state.outputs.iter_mut() {
645+
for output_info in sweeper_state.persistent.outputs.iter_mut() {
639646
if output_info.is_spent_in(*tx) {
640647
output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
641648
}
@@ -646,7 +653,7 @@ where
646653
fn best_block_updated_internal(
647654
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
648655
) {
649-
sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
656+
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
650657
self.prune_confirmed_outputs(sweeper_state);
651658
}
652659
}
@@ -666,15 +673,15 @@ where
666673
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
667674
) {
668675
let mut state_lock = self.sweeper_state.lock().unwrap();
669-
assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
676+
assert_eq!(state_lock.persistent.best_block.block_hash, header.prev_blockhash,
670677
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
671-
assert_eq!(state_lock.best_block.height, height - 1,
678+
assert_eq!(state_lock.persistent.best_block.height, height - 1,
672679
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
673680

674681
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
675682
self.best_block_updated_internal(&mut *state_lock, header, height);
676683

677-
let _ = self.persist_state(&*state_lock).map_err(|e| {
684+
let _ = self.persist_state(&state_lock.persistent).map_err(|e| {
678685
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
679686
});
680687
}
@@ -685,20 +692,20 @@ where
685692
let new_height = height - 1;
686693
let block_hash = header.block_hash();
687694

688-
assert_eq!(state_lock.best_block.block_hash, block_hash,
695+
assert_eq!(state_lock.persistent.best_block.block_hash, block_hash,
689696
"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
690-
assert_eq!(state_lock.best_block.height, height,
697+
assert_eq!(state_lock.persistent.best_block.height, height,
691698
"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
692-
state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height);
699+
state_lock.persistent.best_block = BestBlock::new(header.prev_blockhash, new_height);
693700

694-
for output_info in state_lock.outputs.iter_mut() {
701+
for output_info in state_lock.persistent.outputs.iter_mut() {
695702
if output_info.status.confirmation_hash() == Some(block_hash) {
696703
debug_assert_eq!(output_info.status.confirmation_height(), Some(height));
697704
output_info.status.unconfirmed();
698705
}
699706
}
700707

701-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
708+
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
702709
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
703710
});
704711
}
@@ -720,7 +727,7 @@ where
720727
) {
721728
let mut state_lock = self.sweeper_state.lock().unwrap();
722729
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
723-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
730+
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
724731
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
725732
});
726733
}
@@ -730,6 +737,7 @@ where
730737

731738
// Get what height was unconfirmed.
732739
let unconf_height = state_lock
740+
.persistent
733741
.outputs
734742
.iter()
735743
.find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid))
@@ -738,12 +746,13 @@ where
738746
if let Some(unconf_height) = unconf_height {
739747
// Unconfirm all >= this height.
740748
state_lock
749+
.persistent
741750
.outputs
742751
.iter_mut()
743752
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
744753
.for_each(|o| o.status.unconfirmed());
745754

746-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
755+
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| {
747756
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
748757
});
749758
}
@@ -752,14 +761,15 @@ where
752761
fn best_block_updated(&self, header: &Header, height: u32) {
753762
let mut state_lock = self.sweeper_state.lock().unwrap();
754763
self.best_block_updated_internal(&mut *state_lock, header, height);
755-
let _ = self.persist_state(&*state_lock).map_err(|e| {
764+
let _ = self.persist_state(&state_lock.persistent).map_err(|e| {
756765
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
757766
});
758767
}
759768

760769
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
761770
let state_lock = self.sweeper_state.lock().unwrap();
762771
state_lock
772+
.persistent
763773
.outputs
764774
.iter()
765775
.filter_map(|o| match o.status {
@@ -779,13 +789,18 @@ where
779789
}
780790
}
781791

782-
#[derive(Debug, Clone)]
792+
#[derive(Debug)]
783793
struct SweeperState {
794+
persistent: PersistentSweeperState,
795+
}
796+
797+
#[derive(Debug, Clone)]
798+
struct PersistentSweeperState {
784799
outputs: Vec<TrackedSpendableOutput>,
785800
best_block: BestBlock,
786801
}
787802

788-
impl_writeable_tlv_based!(SweeperState, {
803+
impl_writeable_tlv_based!(PersistentSweeperState, {
789804
(0, outputs, required_vec),
790805
(2, best_block, required),
791806
});
@@ -831,7 +846,7 @@ where
831846
kv_store,
832847
logger,
833848
) = args;
834-
let state = SweeperState::read(reader)?;
849+
let state = PersistentSweeperState::read(reader)?;
835850
let best_block = state.best_block;
836851

837852
if let Some(filter) = chain_data_source.as_ref() {
@@ -841,7 +856,7 @@ where
841856
}
842857
}
843858

844-
let sweeper_state = Mutex::new(state);
859+
let sweeper_state = Mutex::new(SweeperState { persistent: state });
845860
Ok(Self {
846861
sweeper_state,
847862
pending_sweep: AtomicBool::new(false),
@@ -880,7 +895,7 @@ where
880895
kv_store,
881896
logger,
882897
) = args;
883-
let state = SweeperState::read(reader)?;
898+
let state = PersistentSweeperState::read(reader)?;
884899
let best_block = state.best_block;
885900

886901
if let Some(filter) = chain_data_source.as_ref() {
@@ -890,7 +905,7 @@ where
890905
}
891906
}
892907

893-
let sweeper_state = Mutex::new(state);
908+
let sweeper_state = Mutex::new(SweeperState { persistent: state });
894909
Ok((
895910
best_block,
896911
OutputSweeper {

0 commit comments

Comments
 (0)