From 34fda753465da69fc3c0dfcf316b324d1693f50e Mon Sep 17 00:00:00 2001 From: Prabhat Verma Date: Thu, 12 Jun 2025 14:57:12 +0530 Subject: [PATCH 1/2] Add size-based optimization to MonitorUpdatingPersister Introduces an optimization to the MonitorUpdatingPersister to avoid writing differential updates for small channel monitors. When a channel monitor is smaller than a configurable threshold , the persister will now write the full monitor instead of an update. This avoids the I/O overhead of creating and managing many small update files for monitors that don't benefit significantly from differential updates. --- lightning/src/util/persist.rs | 48 +++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 6f1f9d0862a..a351495679c 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -446,6 +446,13 @@ where /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. +/// +/// # Size-based persistence optimization +/// +/// For small channel monitors (below `min_monitor_size_for_updates_bytes` bytes when serialized), +/// this persister will always write the full monitor instead of individual updates. This avoids +/// the overhead of managing update files and later compaction for tiny monitors that don't benefit +/// from differential updates. pub struct MonitorUpdatingPersister where K::Target: KVStore, @@ -458,6 +465,7 @@ where kv_store: K, logger: L, maximum_pending_updates: u64, + min_monitor_size_for_updates_bytes: usize, entropy_source: ES, signer_provider: SP, broadcaster: BI, @@ -475,7 +483,7 @@ where BI::Target: BroadcasterInterface, FE::Target: FeeEstimator, { - /// Constructs a new [`MonitorUpdatingPersister`]. + /// Constructs a new [`MonitorUpdatingPersister`] with a default minimum monitor size threshold. /// /// The `maximum_pending_updates` parameter controls how many updates may be stored before a /// [`MonitorUpdatingPersister`] consolidates updates by writing a full monitor. Note that @@ -491,14 +499,45 @@ where /// less frequent "waves." /// - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run /// [`MonitorUpdatingPersister::cleanup_stale_updates`]. + /// + /// This sets `min_monitor_size_for_updates_bytes` to 4096 bytes (4 KiB), which is a reasonable + /// default for most use cases. Monitors smaller than this will be persisted in full rather than + /// using update-based persistence. Use [`MonitorUpdatingPersister::new_with_monitor_size_threshold`] + /// if you need a custom threshold. pub fn new( kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE, + ) -> Self { + Self::new_with_monitor_size_threshold( + kv_store, + logger, + maximum_pending_updates, + 4096, + entropy_source, + signer_provider, + broadcaster, + fee_estimator, + ) + } + + /// Constructs a new [`MonitorUpdatingPersister`] with a custom minimum monitor size threshold. + /// + /// The `min_monitor_size_for_updates_bytes` parameter sets the minimum serialized size (in bytes) + /// for a [`ChannelMonitor`] to use update-based persistence. Monitors smaller than this threshold + /// will always be persisted in full, avoiding the overhead of managing update files for tiny + /// monitors. Set to 0 to always use update-based persistence regardless of size. + /// + /// For other parameters, see [`MonitorUpdatingPersister::new`]. + pub fn new_with_monitor_size_threshold( + kv_store: K, logger: L, maximum_pending_updates: u64, + min_monitor_size_for_updates_bytes: usize, entropy_source: ES, signer_provider: SP, + broadcaster: BI, fee_estimator: FE, ) -> Self { MonitorUpdatingPersister { kv_store, logger, maximum_pending_updates, + min_monitor_size_for_updates_bytes, entropy_source, signer_provider, broadcaster, @@ -752,7 +791,12 @@ where ) -> chain::ChannelMonitorUpdateStatus { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; if let Some(update) = update { - let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID + // Check if monitor is too small for update-based persistence + let monitor_size = monitor.serialized_length(); + let use_full_persistence = monitor_size < self.min_monitor_size_for_updates_bytes; + + let persist_update = !use_full_persistence + && update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID && update.update_id % self.maximum_pending_updates != 0; if persist_update { let monitor_key = monitor_name.to_string(); From b313e395f6025df029b9ee15784e452a14f1c39e Mon Sep 17 00:00:00 2001 From: Prabhat Verma Date: Thu, 12 Jun 2025 14:57:22 +0530 Subject: [PATCH 2/2] Add tests for persister size-based optimization Adds unit test for introduced size based optimisation. Also updates the old unit test to use a threshold value and use constructors to increase test coverage --- lightning/src/util/persist.rs | 163 ++++++++++++++++++++++++++++++---- 1 file changed, 145 insertions(+), 18 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index a351495679c..d46d6ca1cb5 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -1200,6 +1200,7 @@ mod tests { kv_store: &TestStore::new(false), logger: &TestLogger::new(), maximum_pending_updates: persister_0_max_pending_updates, + min_monitor_size_for_updates_bytes: 0, entropy_source: &chanmon_cfgs[0].keys_manager, signer_provider: &chanmon_cfgs[0].keys_manager, broadcaster: &chanmon_cfgs[0].tx_broadcaster, @@ -1209,6 +1210,7 @@ mod tests { kv_store: &TestStore::new(false), logger: &TestLogger::new(), maximum_pending_updates: persister_1_max_pending_updates, + min_monitor_size_for_updates_bytes: 0, entropy_source: &chanmon_cfgs[1].keys_manager, signer_provider: &chanmon_cfgs[1].keys_manager, broadcaster: &chanmon_cfgs[1].tx_broadcaster, @@ -1374,6 +1376,7 @@ mod tests { kv_store: &TestStore::new(true), logger: &TestLogger::new(), maximum_pending_updates: 11, + min_monitor_size_for_updates_bytes: 0, entropy_source: node_cfgs[0].keys_manager, signer_provider: node_cfgs[0].keys_manager, broadcaster: node_cfgs[0].tx_broadcaster, @@ -1416,24 +1419,36 @@ mod tests { fn clean_stale_updates_works() { let test_max_pending_updates = 7; let chanmon_cfgs = create_chanmon_cfgs(3); - let persister_0 = MonitorUpdatingPersister { - kv_store: &TestStore::new(false), - logger: &TestLogger::new(), - maximum_pending_updates: test_max_pending_updates, - entropy_source: &chanmon_cfgs[0].keys_manager, - signer_provider: &chanmon_cfgs[0].keys_manager, - broadcaster: &chanmon_cfgs[0].tx_broadcaster, - fee_estimator: &chanmon_cfgs[0].fee_estimator, - }; - let persister_1 = MonitorUpdatingPersister { - kv_store: &TestStore::new(false), - logger: &TestLogger::new(), - maximum_pending_updates: test_max_pending_updates, - entropy_source: &chanmon_cfgs[1].keys_manager, - signer_provider: &chanmon_cfgs[1].keys_manager, - broadcaster: &chanmon_cfgs[1].tx_broadcaster, - fee_estimator: &chanmon_cfgs[1].fee_estimator, - }; + let store_0 = TestStore::new(false); + let logger_0 = TestLogger::new(); + let store_1 = TestStore::new(false); + let logger_1 = TestLogger::new(); + + // Test the default new() constructor (uses 4096 byte threshold) + let persister_0 = MonitorUpdatingPersister::new( + &store_0, + &logger_0, + test_max_pending_updates, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].tx_broadcaster, + &chanmon_cfgs[0].fee_estimator, + ); + // Test the custom threshold constructor with zero threshold + let persister_1 = MonitorUpdatingPersister::new_with_monitor_size_threshold( + &store_1, + &logger_1, + test_max_pending_updates, + 0, // 0 byte threshold for maximum update usage + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].tx_broadcaster, + &chanmon_cfgs[1].fee_estimator, + ); + + // Verify the constructors set the thresholds correctly + assert_eq!(persister_0.min_monitor_size_for_updates_bytes, 4096); + assert_eq!(persister_1.min_monitor_size_for_updates_bytes, 0); let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let chain_mon_0 = test_utils::TestChainMonitor::new( Some(&chanmon_cfgs[0].chain_source), @@ -1496,6 +1511,118 @@ mod tests { .is_err()); } + #[test] + fn test_size_based_optimization() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let store_0 = TestStore::new(false); + let logger_0 = TestLogger::new(); + let store_1 = TestStore::new(false); + let logger_1 = TestLogger::new(); + + // Create a persister with a huge minimum size threshold (100KB) + // This should force all monitors to use full persistence instead of updates + // Test the new_with_monitor_size_threshold constructor with large threshold + let large_threshold_persister = MonitorUpdatingPersister::new_with_monitor_size_threshold( + &store_0, + &logger_0, + 5, + 100_000, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].tx_broadcaster, + &chanmon_cfgs[0].fee_estimator, + ); + + // Create a persister with zero minimum size threshold + // This should allow all monitors to use update-based persistence + // Test the new_with_monitor_size_threshold constructor with zero threshold + let small_threshold_persister = MonitorUpdatingPersister::new_with_monitor_size_threshold( + &store_1, + &logger_1, + 5, + 0, // allows all monitors to use updates + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].tx_broadcaster, + &chanmon_cfgs[1].fee_estimator, + ); + + // Verify the constructors set the thresholds correctly + assert_eq!(large_threshold_persister.min_monitor_size_for_updates_bytes, 100_000); + assert_eq!(small_threshold_persister.min_monitor_size_for_updates_bytes, 0); + + let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let chain_mon_0 = test_utils::TestChainMonitor::new( + Some(&chanmon_cfgs[0].chain_source), + &chanmon_cfgs[0].tx_broadcaster, + &chanmon_cfgs[0].logger, + &chanmon_cfgs[0].fee_estimator, + &large_threshold_persister, + &chanmon_cfgs[0].keys_manager, + ); + let chain_mon_1 = test_utils::TestChainMonitor::new( + Some(&chanmon_cfgs[1].chain_source), + &chanmon_cfgs[1].tx_broadcaster, + &chanmon_cfgs[1].logger, + &chanmon_cfgs[1].fee_estimator, + &small_threshold_persister, + &chanmon_cfgs[1].keys_manager, + ); + node_cfgs[0].chain_monitor = chain_mon_0; + node_cfgs[1].chain_monitor = chain_mon_1; + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + // Create a channel and make a payment to trigger monitor updates + let _ = create_announced_chan_between_nodes(&nodes, 0, 1); + send_payment(&nodes[0], &vec![&nodes[1]][..], 1_000_000); + + // Test passes if we can create the channels and send payments without issues. + // The actual verification is that the different persisters behave differently + // based on their thresholds, which we can verify by ensuring no panics occur. + + // Verify that monitors were created + let persisted_data_0 = + large_threshold_persister.read_all_channel_monitors_with_updates().unwrap(); + let persisted_data_1 = + small_threshold_persister.read_all_channel_monitors_with_updates().unwrap(); + + assert_eq!(persisted_data_0.len(), 1); + assert_eq!(persisted_data_1.len(), 1); + + // Verify the monitors exist and are of reasonable size + for (_, monitor) in persisted_data_0.iter() { + let monitor_size = monitor.serialized_length(); + // Verify the monitor is not empty and reasonably sized + assert!( + monitor_size > 1000, + "Monitor should be at least 1KB in size, got {} bytes", + monitor_size + ); + assert!( + monitor_size < 100_000, + "Monitor should be smaller than 100KB threshold, got {} bytes", + monitor_size + ); + } + + for (_, monitor) in persisted_data_1.iter() { + let monitor_size = monitor.serialized_length(); + // Verify the monitor is not empty and reasonably sized + assert!( + monitor_size > 1000, + "Monitor should be at least 1KB in size, got {} bytes", + monitor_size + ); + // Since threshold is 0, this monitor should be large enough to use updates + assert!( + monitor_size > 0, + "Monitor should be larger than 0 byte threshold, got {} bytes", + monitor_size + ); + } + } + fn persist_fn(_persist: P) -> bool where P::Target: Persist,