Skip to content

Commit 6138980

Browse files
committed
Persist sweeper state as part of background process
To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart.
1 parent 6d52f3a commit 6138980

File tree

1 file changed

+38
-27
lines changed

1 file changed

+38
-27
lines changed

lightning/src/util/sweep.rs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,10 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state =
386-
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
385+
let sweeper_state = Mutex::new(SweeperState {
386+
persistent: PersistentSweeperState { outputs, best_block },
387+
dirty: false,
388+
});
387389
Self {
388390
sweeper_state,
389391
pending_sweep: AtomicBool::new(false),
@@ -450,7 +452,7 @@ where
450452

451453
state_lock.persistent.outputs.push(output_info);
452454
}
453-
self.persist_state(&state_lock).map_err(|e| {
455+
self.flush_state(&mut state_lock).map_err(|e| {
454456
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
455457
})
456458
}
@@ -478,7 +480,19 @@ where
478480
return Ok(());
479481
}
480482

481-
let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
483+
let result = {
484+
self.regenerate_and_broadcast_spend_if_necessary_internal().await?;
485+
486+
// If there is still dirty state, we need to persist it.
487+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
488+
if sweeper_state.dirty {
489+
self.flush_state(&mut sweeper_state).map_err(|e| {
490+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
491+
})
492+
} else {
493+
Ok(())
494+
}
495+
};
482496

483497
// Release the pending sweep flag again, regardless of result.
484498
self.pending_sweep.store(false, Ordering::Release);
@@ -572,7 +586,7 @@ where
572586
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
573587
}
574588

575-
self.persist_state(&sweeper_state).map_err(|e| {
589+
self.flush_state(&mut sweeper_state).map_err(|e| {
576590
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
577591
})?;
578592

@@ -600,9 +614,12 @@ where
600614
}
601615
true
602616
});
617+
618+
sweeper_state.dirty = true;
603619
}
604620

605-
fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
621+
/// Flushes the current state to the persistence layer and marks the state as clean.
622+
fn flush_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> {
606623
self.kv_store
607624
.write(
608625
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -621,6 +638,9 @@ where
621638
);
622639
e
623640
})
641+
.map(|_| {
642+
sweeper_state.dirty = false;
643+
})
624644
}
625645

626646
fn spend_outputs(
@@ -653,13 +673,17 @@ where
653673
}
654674
}
655675
}
676+
677+
sweeper_state.dirty = true;
656678
}
657679

658680
fn best_block_updated_internal(
659681
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
660682
) {
661683
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
662684
self.prune_confirmed_outputs(sweeper_state);
685+
686+
sweeper_state.dirty = true;
663687
}
664688
}
665689

@@ -683,12 +707,8 @@ where
683707
assert_eq!(state_lock.persistent.best_block.height, height - 1,
684708
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
685709

686-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
687-
self.best_block_updated_internal(&mut *state_lock, header, height);
688-
689-
let _ = self.persist_state(&*state_lock).map_err(|e| {
690-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
691-
});
710+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
711+
self.best_block_updated_internal(&mut state_lock, header, height);
692712
}
693713

694714
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -710,9 +730,7 @@ where
710730
}
711731
}
712732

713-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
714-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
715-
});
733+
state_lock.dirty = true;
716734
}
717735
}
718736

@@ -732,9 +750,6 @@ where
732750
) {
733751
let mut state_lock = self.sweeper_state.lock().unwrap();
734752
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
735-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
736-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
737-
});
738753
}
739754

740755
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -757,18 +772,13 @@ where
757772
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
758773
.for_each(|o| o.status.unconfirmed());
759774

760-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
761-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
762-
});
775+
state_lock.dirty = true;
763776
}
764777
}
765778

766779
fn best_block_updated(&self, header: &Header, height: u32) {
767780
let mut state_lock = self.sweeper_state.lock().unwrap();
768-
self.best_block_updated_internal(&mut *state_lock, header, height);
769-
let _ = self.persist_state(&*state_lock).map_err(|e| {
770-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
771-
});
781+
self.best_block_updated_internal(&mut state_lock, header, height);
772782
}
773783

774784
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -797,6 +807,7 @@ where
797807
#[derive(Debug)]
798808
struct SweeperState {
799809
persistent: PersistentSweeperState,
810+
dirty: bool,
800811
}
801812

802813
#[derive(Debug, Clone)]
@@ -861,7 +872,7 @@ where
861872
}
862873
}
863874

864-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
875+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
865876
Ok(Self {
866877
sweeper_state,
867878
pending_sweep: AtomicBool::new(false),
@@ -910,7 +921,7 @@ where
910921
}
911922
}
912923

913-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
924+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
914925
Ok((
915926
best_block,
916927
OutputSweeper {

0 commit comments

Comments
 (0)