From c09d41c20ccef55d222af489af839ba5f7d49e45 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Jun 2025 10:14:28 +0200 Subject: [PATCH 1/2] Separate sweeper persistent state Prepare for adding runtime state while avoiding the _unused serialization macro config. --- lightning/src/util/sweep.rs | 89 ++++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 37 deletions(-) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 0fae91bebc2..a80e6b524c1 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -382,7 +382,8 @@ where output_spender: O, change_destination_source: D, kv_store: K, logger: L, ) -> Self { let outputs = Vec::new(); - let sweeper_state = Mutex::new(SweeperState { outputs, best_block }); + let sweeper_state = + Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } }); Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -437,27 +438,27 @@ where }, }; - if state_lock.outputs.iter().find(|o| o.descriptor == output_info.descriptor).is_some() - { + let mut outputs = state_lock.persistent.outputs.iter(); + if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() { continue; } - state_lock.outputs.push(output_info); + state_lock.persistent.outputs.push(output_info); } - self.persist_state(&*state_lock).map_err(|e| { + self.persist_state(&state_lock.persistent).map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }) } /// Returns a list of the currently tracked spendable outputs. pub fn tracked_spendable_outputs(&self) -> Vec { - self.sweeper_state.lock().unwrap().outputs.clone() + self.sweeper_state.lock().unwrap().persistent.outputs.clone() } /// Gets the latest best block which was connected either via the [`Listen`] or /// [`Confirm`] interfaces. pub fn current_best_block(&self) -> BestBlock { - self.sweeper_state.lock().unwrap().best_block + self.sweeper_state.lock().unwrap().persistent.best_block } /// Regenerates and broadcasts the spending transaction for any outputs that are pending. This method will be a @@ -505,8 +506,9 @@ where { let sweeper_state = self.sweeper_state.lock().unwrap(); - let cur_height = sweeper_state.best_block.height; - let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height)); + let cur_height = sweeper_state.persistent.best_block.height; + let has_respends = + sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height)); if !has_respends { return Ok(()); } @@ -520,10 +522,11 @@ where { let mut sweeper_state = self.sweeper_state.lock().unwrap(); - let cur_height = sweeper_state.best_block.height; - let cur_hash = sweeper_state.best_block.block_hash; + let cur_height = sweeper_state.persistent.best_block.height; + let cur_hash = sweeper_state.persistent.best_block.block_hash; let respend_descriptors: Vec<&SpendableOutputDescriptor> = sweeper_state + .persistent .outputs .iter() .filter(|o| filter_fn(*o, cur_height)) @@ -536,7 +539,11 @@ where } let spending_tx = self - .spend_outputs(&sweeper_state, &respend_descriptors, change_destination_script) + .spend_outputs( + &sweeper_state.persistent, + &respend_descriptors, + change_destination_script, + ) .map_err(|e| { log_error!(self.logger, "Error spending outputs: {:?}", e); })?; @@ -550,7 +557,7 @@ where // As we didn't modify the state so far, the same filter_fn yields the same elements as // above. let respend_outputs = - sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height)); + sweeper_state.persistent.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height)); for output_info in respend_outputs { if let Some(filter) = self.chain_data_source.as_ref() { let watched_output = output_info.to_watched_output(cur_hash); @@ -560,7 +567,7 @@ where output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); } - self.persist_state(&sweeper_state).map_err(|e| { + self.persist_state(&sweeper_state.persistent).map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); })?; @@ -571,10 +578,10 @@ where } fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) { - let cur_height = sweeper_state.best_block.height; + let cur_height = sweeper_state.persistent.best_block.height; // Prune all outputs that have sufficient depth by now. - sweeper_state.outputs.retain(|o| { + sweeper_state.persistent.outputs.retain(|o| { if let Some(confirmation_height) = o.status.confirmation_height() { // We wait at least `PRUNE_DELAY_BLOCKS` as before that // `Event::SpendableOutputs` from lingering monitors might get replayed. @@ -590,7 +597,7 @@ where }); } - fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { + fn persist_state(&self, sweeper_state: &PersistentSweeperState) -> Result<(), io::Error> { self.kv_store .write( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -612,7 +619,7 @@ where } fn spend_outputs( - &self, sweeper_state: &SweeperState, descriptors: &[&SpendableOutputDescriptor], + &self, sweeper_state: &PersistentSweeperState, descriptors: &[&SpendableOutputDescriptor], change_destination_script: ScriptBuf, ) -> Result { let tx_feerate = @@ -635,7 +642,7 @@ where ) { let confirmation_hash = header.block_hash(); for (_, tx) in txdata { - for output_info in sweeper_state.outputs.iter_mut() { + for output_info in sweeper_state.persistent.outputs.iter_mut() { if output_info.is_spent_in(*tx) { output_info.status.confirmed(confirmation_hash, height, (*tx).clone()) } @@ -646,7 +653,7 @@ where fn best_block_updated_internal( &self, sweeper_state: &mut SweeperState, header: &Header, height: u32, ) { - sweeper_state.best_block = BestBlock::new(header.block_hash(), height); + sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height); self.prune_confirmed_outputs(sweeper_state); } } @@ -666,15 +673,15 @@ where &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, ) { let mut state_lock = self.sweeper_state.lock().unwrap(); - assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash, + assert_eq!(state_lock.persistent.best_block.block_hash, header.prev_blockhash, "Blocks must be connected in chain-order - the connected header must build on the last connected header"); - assert_eq!(state_lock.best_block.height, height - 1, + assert_eq!(state_lock.persistent.best_block.height, height - 1, "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); self.best_block_updated_internal(&mut *state_lock, header, height); - let _ = self.persist_state(&*state_lock).map_err(|e| { + let _ = self.persist_state(&state_lock.persistent).map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }); } @@ -685,20 +692,20 @@ where let new_height = height - 1; let block_hash = header.block_hash(); - assert_eq!(state_lock.best_block.block_hash, block_hash, + assert_eq!(state_lock.persistent.best_block.block_hash, block_hash, "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); - assert_eq!(state_lock.best_block.height, height, + assert_eq!(state_lock.persistent.best_block.height, height, "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); - state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height); + state_lock.persistent.best_block = BestBlock::new(header.prev_blockhash, new_height); - for output_info in state_lock.outputs.iter_mut() { + for output_info in state_lock.persistent.outputs.iter_mut() { if output_info.status.confirmation_hash() == Some(block_hash) { debug_assert_eq!(output_info.status.confirmation_height(), Some(height)); output_info.status.unconfirmed(); } } - self.persist_state(&*state_lock).unwrap_or_else(|e| { + self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }); } @@ -720,7 +727,7 @@ where ) { let mut state_lock = self.sweeper_state.lock().unwrap(); self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); - self.persist_state(&*state_lock).unwrap_or_else(|e| { + self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }); } @@ -730,6 +737,7 @@ where // Get what height was unconfirmed. let unconf_height = state_lock + .persistent .outputs .iter() .find(|o| o.status.latest_spending_tx().map(|tx| tx.compute_txid()) == Some(*txid)) @@ -738,12 +746,13 @@ where if let Some(unconf_height) = unconf_height { // Unconfirm all >= this height. state_lock + .persistent .outputs .iter_mut() .filter(|o| o.status.confirmation_height() >= Some(unconf_height)) .for_each(|o| o.status.unconfirmed()); - self.persist_state(&*state_lock).unwrap_or_else(|e| { + self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }); } @@ -752,7 +761,7 @@ where fn best_block_updated(&self, header: &Header, height: u32) { let mut state_lock = self.sweeper_state.lock().unwrap(); self.best_block_updated_internal(&mut *state_lock, header, height); - let _ = self.persist_state(&*state_lock).map_err(|e| { + let _ = self.persist_state(&state_lock.persistent).map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); }); } @@ -760,6 +769,7 @@ where fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { let state_lock = self.sweeper_state.lock().unwrap(); state_lock + .persistent .outputs .iter() .filter_map(|o| match o.status { @@ -779,13 +789,18 @@ where } } -#[derive(Debug, Clone)] +#[derive(Debug)] struct SweeperState { + persistent: PersistentSweeperState, +} + +#[derive(Debug, Clone)] +struct PersistentSweeperState { outputs: Vec, best_block: BestBlock, } -impl_writeable_tlv_based!(SweeperState, { +impl_writeable_tlv_based!(PersistentSweeperState, { (0, outputs, required_vec), (2, best_block, required), }); @@ -831,7 +846,7 @@ where kv_store, logger, ) = args; - let state = SweeperState::read(reader)?; + let state = PersistentSweeperState::read(reader)?; let best_block = state.best_block; if let Some(filter) = chain_data_source.as_ref() { @@ -841,7 +856,7 @@ where } } - let sweeper_state = Mutex::new(state); + let sweeper_state = Mutex::new(SweeperState { persistent: state }); Ok(Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -880,7 +895,7 @@ where kv_store, logger, ) = args; - let state = SweeperState::read(reader)?; + let state = PersistentSweeperState::read(reader)?; let best_block = state.best_block; if let Some(filter) = chain_data_source.as_ref() { @@ -890,7 +905,7 @@ where } } - let sweeper_state = Mutex::new(state); + let sweeper_state = Mutex::new(SweeperState { persistent: state }); Ok(( best_block, OutputSweeper { From 84ce2f238c9432a0349717db87f1d7b9044751b0 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Jun 2025 14:54:25 +0200 Subject: [PATCH 2/2] Persist sweeper state as part of background process To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart. --- lightning/src/util/sweep.rs | 60 +++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index a80e6b524c1..dfdd078bc8e 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -382,8 +382,10 @@ where output_spender: O, change_destination_source: D, kv_store: K, logger: L, ) -> Self { let outputs = Vec::new(); - let sweeper_state = - Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } }); + let sweeper_state = Mutex::new(SweeperState { + persistent: PersistentSweeperState { outputs, best_block }, + dirty: false, + }); Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -447,7 +449,10 @@ where } self.persist_state(&state_lock.persistent).map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }) + })?; + state_lock.dirty = false; + + Ok(()) } /// Returns a list of the currently tracked spendable outputs. @@ -504,12 +509,20 @@ where // See if there is anything to sweep before requesting a change address. { - let sweeper_state = self.sweeper_state.lock().unwrap(); + let mut sweeper_state = self.sweeper_state.lock().unwrap(); let cur_height = sweeper_state.persistent.best_block.height; let has_respends = sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height)); if !has_respends { + // If there is nothing to sweep, we still persist the state if it is dirty. + if sweeper_state.dirty { + self.persist_state(&sweeper_state.persistent).map_err(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + })?; + sweeper_state.dirty = false; + } + return Ok(()); } } @@ -534,7 +547,8 @@ where .collect(); if respend_descriptors.is_empty() { - // It could be that a tx confirmed and there is now nothing to sweep anymore. + // It could be that a tx confirmed and there is now nothing to sweep anymore. If there is dirty state, + // we'll persist it in the next cycle. return Ok(()); } @@ -570,6 +584,7 @@ where self.persist_state(&sweeper_state.persistent).map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); })?; + sweeper_state.dirty = false; self.broadcaster.broadcast_transactions(&[&spending_tx]); } @@ -595,6 +610,8 @@ where } true }); + + sweeper_state.dirty = true; } fn persist_state(&self, sweeper_state: &PersistentSweeperState) -> Result<(), io::Error> { @@ -648,6 +665,8 @@ where } } } + + sweeper_state.dirty = true; } fn best_block_updated_internal( @@ -655,6 +674,8 @@ where ) { sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height); self.prune_confirmed_outputs(sweeper_state); + + sweeper_state.dirty = true; } } @@ -678,12 +699,8 @@ where assert_eq!(state_lock.persistent.best_block.height, height - 1, "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); - self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); - self.best_block_updated_internal(&mut *state_lock, header, height); - - let _ = self.persist_state(&state_lock.persistent).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + self.transactions_confirmed_internal(&mut state_lock, header, txdata, height); + self.best_block_updated_internal(&mut state_lock, header, height); } fn block_disconnected(&self, header: &Header, height: u32) { @@ -705,9 +722,7 @@ where } } - self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + state_lock.dirty = true; } } @@ -727,9 +742,6 @@ where ) { let mut state_lock = self.sweeper_state.lock().unwrap(); self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); - self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); } fn transaction_unconfirmed(&self, txid: &Txid) { @@ -752,18 +764,13 @@ where .filter(|o| o.status.confirmation_height() >= Some(unconf_height)) .for_each(|o| o.status.unconfirmed()); - self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + state_lock.dirty = true; } } fn best_block_updated(&self, header: &Header, height: u32) { let mut state_lock = self.sweeper_state.lock().unwrap(); - self.best_block_updated_internal(&mut *state_lock, header, height); - let _ = self.persist_state(&state_lock.persistent).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + self.best_block_updated_internal(&mut state_lock, header, height); } fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { @@ -792,6 +799,7 @@ where #[derive(Debug)] struct SweeperState { persistent: PersistentSweeperState, + dirty: bool, } #[derive(Debug, Clone)] @@ -856,7 +864,7 @@ where } } - let sweeper_state = Mutex::new(SweeperState { persistent: state }); + let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false }); Ok(Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -905,7 +913,7 @@ where } } - let sweeper_state = Mutex::new(SweeperState { persistent: state }); + let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false }); Ok(( best_block, OutputSweeper {