Skip to content

feat: Make MonitorUpdatingPersister change persist type based on size #3834

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 191 additions & 20 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,13 @@ where
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
/// would like to get rid of them, consider using the
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
///
/// # Size-based persistence optimization
///
/// For small channel monitors (below `min_monitor_size_for_updates_bytes` bytes when serialized),
/// this persister will always write the full monitor instead of individual updates. This avoids
/// the overhead of managing update files and later compaction for tiny monitors that don't benefit
/// from differential updates.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still not clear to me how much the gain is of this in practice. Also worried that disabling the incremental path initially allow certain bugs to linger for longer, just because the path isn't hit as much, or rarely.

pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
where
K::Target: KVStore,
Expand All @@ -458,6 +465,7 @@ where
kv_store: K,
logger: L,
maximum_pending_updates: u64,
min_monitor_size_for_updates_bytes: usize,
entropy_source: ES,
signer_provider: SP,
broadcaster: BI,
Expand All @@ -475,7 +483,7 @@ where
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
/// Constructs a new [`MonitorUpdatingPersister`].
/// Constructs a new [`MonitorUpdatingPersister`] with a default minimum monitor size threshold.
///
/// The `maximum_pending_updates` parameter controls how many updates may be stored before a
/// [`MonitorUpdatingPersister`] consolidates updates by writing a full monitor. Note that
Expand All @@ -491,14 +499,45 @@ where
/// less frequent "waves."
/// - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
///
/// This sets `min_monitor_size_for_updates_bytes` to 4096 bytes (4 KiB), which is a reasonable
/// default for most use cases. Monitors smaller than this will be persisted in full rather than
/// using update-based persistence. Use [`MonitorUpdatingPersister::new_with_monitor_size_threshold`]
/// if you need a custom threshold.
pub fn new(
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
) -> Self {
Self::new_with_monitor_size_threshold(
kv_store,
logger,
maximum_pending_updates,
4096,
entropy_source,
signer_provider,
broadcaster,
fee_estimator,
)
}

/// Constructs a new [`MonitorUpdatingPersister`] with a custom minimum monitor size threshold.
///
/// The `min_monitor_size_for_updates_bytes` parameter sets the minimum serialized size (in bytes)
/// for a [`ChannelMonitor`] to use update-based persistence. Monitors smaller than this threshold
/// will always be persisted in full, avoiding the overhead of managing update files for tiny
/// monitors. Set to 0 to always use update-based persistence regardless of size.
///
/// For other parameters, see [`MonitorUpdatingPersister::new`].
pub fn new_with_monitor_size_threshold(
kv_store: K, logger: L, maximum_pending_updates: u64,
min_monitor_size_for_updates_bytes: usize, entropy_source: ES, signer_provider: SP,
broadcaster: BI, fee_estimator: FE,
) -> Self {
MonitorUpdatingPersister {
kv_store,
logger,
maximum_pending_updates,
min_monitor_size_for_updates_bytes,
entropy_source,
signer_provider,
broadcaster,
Expand Down Expand Up @@ -752,7 +791,12 @@ where
) -> chain::ChannelMonitorUpdateStatus {
const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
if let Some(update) = update {
let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
// Check if monitor is too small for update-based persistence
let monitor_size = monitor.serialized_length();
let use_full_persistence = monitor_size < self.min_monitor_size_for_updates_bytes;

let persist_update = !use_full_persistence
&& update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
&& update.update_id % self.maximum_pending_updates != 0;
if persist_update {
let monitor_key = monitor_name.to_string();
Expand Down Expand Up @@ -1156,6 +1200,7 @@ mod tests {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: persister_0_max_pending_updates,
min_monitor_size_for_updates_bytes: 0,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
Expand All @@ -1165,6 +1210,7 @@ mod tests {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: persister_1_max_pending_updates,
min_monitor_size_for_updates_bytes: 0,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
Expand Down Expand Up @@ -1330,6 +1376,7 @@ mod tests {
kv_store: &TestStore::new(true),
logger: &TestLogger::new(),
maximum_pending_updates: 11,
min_monitor_size_for_updates_bytes: 0,
entropy_source: node_cfgs[0].keys_manager,
signer_provider: node_cfgs[0].keys_manager,
broadcaster: node_cfgs[0].tx_broadcaster,
Expand Down Expand Up @@ -1372,24 +1419,36 @@ mod tests {
fn clean_stale_updates_works() {
let test_max_pending_updates = 7;
let chanmon_cfgs = create_chanmon_cfgs(3);
let persister_0 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
fee_estimator: &chanmon_cfgs[0].fee_estimator,
};
let persister_1 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
fee_estimator: &chanmon_cfgs[1].fee_estimator,
};
let store_0 = TestStore::new(false);
let logger_0 = TestLogger::new();
let store_1 = TestStore::new(false);
let logger_1 = TestLogger::new();

// Test the default new() constructor (uses 4096 byte threshold)
let persister_0 = MonitorUpdatingPersister::new(
&store_0,
&logger_0,
test_max_pending_updates,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
);
// Test the custom threshold constructor with zero threshold
let persister_1 = MonitorUpdatingPersister::new_with_monitor_size_threshold(
&store_1,
&logger_1,
test_max_pending_updates,
0, // 0 byte threshold for maximum update usage
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].fee_estimator,
);

// Verify the constructors set the thresholds correctly
assert_eq!(persister_0.min_monitor_size_for_updates_bytes, 4096);
assert_eq!(persister_1.min_monitor_size_for_updates_bytes, 0);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
Expand Down Expand Up @@ -1452,6 +1511,118 @@ mod tests {
.is_err());
}

#[test]
fn test_size_based_optimization() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let store_0 = TestStore::new(false);
let logger_0 = TestLogger::new();
let store_1 = TestStore::new(false);
let logger_1 = TestLogger::new();

// Create a persister with a huge minimum size threshold (100KB)
// This should force all monitors to use full persistence instead of updates
// Test the new_with_monitor_size_threshold constructor with large threshold
let large_threshold_persister = MonitorUpdatingPersister::new_with_monitor_size_threshold(
&store_0,
&logger_0,
5,
100_000,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
);

// Create a persister with zero minimum size threshold
// This should allow all monitors to use update-based persistence
// Test the new_with_monitor_size_threshold constructor with zero threshold
let small_threshold_persister = MonitorUpdatingPersister::new_with_monitor_size_threshold(
&store_1,
&logger_1,
5,
0, // allows all monitors to use updates
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].fee_estimator,
);

// Verify the constructors set the thresholds correctly
assert_eq!(large_threshold_persister.min_monitor_size_for_updates_bytes, 100_000);
assert_eq!(small_threshold_persister.min_monitor_size_for_updates_bytes, 0);

let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
&large_threshold_persister,
&chanmon_cfgs[0].keys_manager,
);
let chain_mon_1 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[1].chain_source),
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].logger,
&chanmon_cfgs[1].fee_estimator,
&small_threshold_persister,
&chanmon_cfgs[1].keys_manager,
);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Create a channel and make a payment to trigger monitor updates
let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
send_payment(&nodes[0], &vec![&nodes[1]][..], 1_000_000);

// Test passes if we can create the channels and send payments without issues.
// The actual verification is that the different persisters behave differently
// based on their thresholds, which we can verify by ensuring no panics occur.

// Verify that monitors were created
let persisted_data_0 =
large_threshold_persister.read_all_channel_monitors_with_updates().unwrap();
let persisted_data_1 =
small_threshold_persister.read_all_channel_monitors_with_updates().unwrap();

assert_eq!(persisted_data_0.len(), 1);
assert_eq!(persisted_data_1.len(), 1);

// Verify the monitors exist and are of reasonable size
for (_, monitor) in persisted_data_0.iter() {
let monitor_size = monitor.serialized_length();
// Verify the monitor is not empty and reasonably sized
assert!(
monitor_size > 1000,
"Monitor should be at least 1KB in size, got {} bytes",
monitor_size
);
assert!(
monitor_size < 100_000,
"Monitor should be smaller than 100KB threshold, got {} bytes",
monitor_size
);
}

for (_, monitor) in persisted_data_1.iter() {
let monitor_size = monitor.serialized_length();
// Verify the monitor is not empty and reasonably sized
assert!(
monitor_size > 1000,
"Monitor should be at least 1KB in size, got {} bytes",
monitor_size
);
// Since threshold is 0, this monitor should be large enough to use updates
assert!(
monitor_size > 0,
"Monitor should be larger than 0 byte threshold, got {} bytes",
monitor_size
);
}
}

fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
where
P::Target: Persist<ChannelSigner>,
Expand Down
Loading