diff --git a/Cargo.toml b/Cargo.toml index caf960a7c..edc41ed31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,21 +32,21 @@ panic = 'abort' # Abort on panic default = [] [dependencies] -lightning = { version = "0.0.116", features = ["max_level_trace", "std"] } -lightning-invoice = { version = "0.24.0" } -lightning-net-tokio = { version = "0.0.116" } -lightning-persister = { version = "0.0.116" } -lightning-background-processor = { version = "0.0.116", features = ["futures"] } -lightning-rapid-gossip-sync = { version = "0.0.116" } -lightning-transaction-sync = { version = "0.0.116", features = ["esplora-async-https"] } - -#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["max_level_trace", "std"] } -#lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } -#lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } -#lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } -#lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["futures"] } -#lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } -#lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["esplora-async"] } +lightning = { version = "0.0.117", features = ["max_level_trace", "std"] } +lightning-invoice = { version = "0.25.0" } +lightning-net-tokio = { version = "0.0.117" } +lightning-persister = { version = "0.0.117" } +lightning-background-processor = { version = "0.0.117", features = ["futures"] } +lightning-rapid-gossip-sync = { version = "0.0.117" } +lightning-transaction-sync = { version = "0.0.117", features = ["esplora-async-https"] } + +# lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["max_level_trace", "std"] } +# lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } +# lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } +# lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } +# lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["futures"] } +# lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } +# lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["esplora-async"] } #lightning = { path = "../rust-lightning/lightning", features = ["max_level_trace", "std"] } #lightning-invoice = { path = "../rust-lightning/lightning-invoice" } @@ -57,6 +57,7 @@ lightning-transaction-sync = { version = "0.0.116", features = ["esplora-async-h #lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] } bdk = { version = "0.28.0", default-features = false, features = ["std", "async-interface", "use-esplora-async", "sqlite-bundled", "keys-bip39"]} + reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } rusqlite = { version = "0.28.0", features = ["bundled"] } bitcoin = "0.29.2" @@ -74,6 +75,8 @@ uniffi = { version = "0.23.0", features = ["build"], optional = true } winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] +lightning = { version = "0.0.117", features = ["max_level_trace", "std", "_test_utils"] } +#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["max_level_trace", "std", "_test_utils"] } electrsd = { version = "0.22.0", features = ["legacy", "esplora_a33e97e1", "bitcoind_23_0"] } electrum-client = "0.12.0" proptest = "1.0.0" diff --git a/README.md b/README.md index be55a849c..ef5ffe5aa 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ LDK Node is a self-custodial Lightning node in library form. Its central goal is The primary abstraction of the library is the [`Node`][api_docs_node], which can be retrieved by setting up and configuring a [`Builder`][api_docs_builder] to your liking and calling one of the `build` methods. `Node` can then be controlled via commands such as `start`, `stop`, `connect_open_channel`, `send_payment`, etc. ```rust -use ldk_node::{Builder, NetAddress}; +use ldk_node::{Builder, SocketAddress}; use ldk_node::lightning_invoice::Invoice; use ldk_node::bitcoin::secp256k1::PublicKey; use ldk_node::bitcoin::Network; @@ -32,7 +32,7 @@ fn main() { // .. fund address .. let node_id = PublicKey::from_str("NODE_ID").unwrap(); - let node_addr = NetAddress::from_str("IP_ADDR:PORT").unwrap(); + let node_addr = SocketAddress::from_str("IP_ADDR:PORT").unwrap(); node.connect_open_channel(node_id, node_addr, 10000, None, None, false).unwrap(); let event = node.wait_next_event(); diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 3b4a43e97..d924e0887 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -6,7 +6,7 @@ dictionary Config { string storage_dir_path = "/tmp/ldk_node/"; string? log_dir_path = null; Network network = "Bitcoin"; - NetAddress? listening_address = null; + SocketAddress? listening_address = null; u32 default_cltv_expiry_delta = 144; u64 onchain_wallet_sync_interval_secs = 80; u64 wallet_sync_interval_secs = 30; @@ -29,7 +29,7 @@ interface Builder { void set_gossip_source_rgs(string rgs_server_url); void set_storage_dir_path(string storage_dir_path); void set_network(Network network); - void set_listening_address(NetAddress listening_address); + void set_listening_address(SocketAddress listening_address); [Throws=BuildError] LDKNode build(); }; @@ -43,7 +43,7 @@ interface LDKNode { Event wait_next_event(); void event_handled(); PublicKey node_id(); - NetAddress? listening_address(); + SocketAddress? listening_address(); [Throws=NodeError] Address new_onchain_address(); [Throws=NodeError] @@ -55,11 +55,11 @@ interface LDKNode { [Throws=NodeError] u64 total_onchain_balance_sats(); [Throws=NodeError] - void connect(PublicKey node_id, NetAddress address, boolean persist); + void connect(PublicKey node_id, SocketAddress address, boolean persist); [Throws=NodeError] void disconnect(PublicKey node_id); [Throws=NodeError] - void connect_open_channel(PublicKey node_id, NetAddress address, u64 channel_amount_sats, u64? push_to_counterparty_msat, ChannelConfig? channel_config, boolean announce_channel); + void connect_open_channel(PublicKey node_id, SocketAddress address, u64 channel_amount_sats, u64? push_to_counterparty_msat, ChannelConfig? channel_config, boolean announce_channel); [Throws=NodeError] void close_channel([ByRef]ChannelId channel_id, PublicKey counterparty_node_id); [Throws=NodeError] @@ -73,9 +73,11 @@ interface LDKNode { [Throws=NodeError] PaymentHash send_spontaneous_payment(u64 amount_msat, PublicKey node_id); [Throws=NodeError] - void send_payment_probe([ByRef]Bolt11Invoice invoice); + void send_payment_probes([ByRef]Bolt11Invoice invoice); [Throws=NodeError] - void send_spontaneous_payment_probe(u64 amount_msat, PublicKey node_id); + void send_spontaneous_payment_probes(u64 amount_msat, PublicKey node_id); + [Throws=NodeError] + void send_payment_probes_using_amount([ByRef]Bolt11Invoice invoice, u64 amount_msat); [Throws=NodeError] Bolt11Invoice receive_payment(u64 amount_msat, [ByRef]string description, u32 expiry_secs); [Throws=NodeError] @@ -111,7 +113,7 @@ enum NodeError { "TxSyncFailed", "GossipUpdateFailed", "InvalidAddress", - "InvalidNetAddress", + "InvalidSocketAddress", "InvalidPublicKey", "InvalidSecretKey", "InvalidPaymentHash", @@ -130,9 +132,11 @@ enum BuildError { "InvalidSeedBytes", "InvalidSeedFile", "InvalidSystemTime", + "InvalidChannelMonitor", "ReadFailed", "WriteFailed", "StoragePathAccessFailed", + "KVStoreSetupFailed", "WalletSetupFailed", "LoggerSetupFailed", }; @@ -143,8 +147,8 @@ interface Event { PaymentFailed( PaymentHash payment_hash ); PaymentReceived( PaymentHash payment_hash, u64 amount_msat); ChannelPending ( ChannelId channel_id, UserChannelId user_channel_id, ChannelId former_temporary_channel_id, PublicKey counterparty_node_id, OutPoint funding_txo ); - ChannelReady ( ChannelId channel_id, UserChannelId user_channel_id ); - ChannelClosed ( ChannelId channel_id, UserChannelId user_channel_id ); + ChannelReady ( ChannelId channel_id, UserChannelId user_channel_id, PublicKey? counterparty_node_id ); + ChannelClosed ( ChannelId channel_id, UserChannelId user_channel_id, PublicKey? counterparty_node_id ); }; enum PaymentDirection { @@ -201,7 +205,7 @@ dictionary ChannelDetails { dictionary PeerDetails { PublicKey node_id; - NetAddress address; + SocketAddress address; boolean is_persisted; boolean is_connected; }; @@ -235,7 +239,7 @@ enum LogLevel { typedef string Txid; [Custom] -typedef string NetAddress; +typedef string SocketAddress; [Custom] typedef string PublicKey; diff --git a/src/builder.rs b/src/builder.rs index f34f950a9..c0645a7f6 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,15 +1,13 @@ use crate::event::EventQueue; use crate::gossip::GossipSource; use crate::io; -use crate::io::fs_store::FilesystemStore; use crate::io::sqlite_store::SqliteStore; -use crate::io::{KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE}; use crate::logger::{log_error, FilesystemLogger, Logger}; use crate::payment_store::PaymentStore; use crate::peer_store::PeerStore; use crate::types::{ - ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetAddress, - NetworkGraph, OnionMessenger, PeerManager, + ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph, + OnionMessenger, PeerManager, SocketAddress, }; use crate::wallet::Wallet; use crate::LogLevel; @@ -29,8 +27,14 @@ use lightning::routing::scoring::{ use lightning::sign::EntropySource; use lightning::util::config::UserConfig; +use lightning::util::persist::{ + read_channel_monitors, KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning::util::ser::ReadableArgs; +use lightning_persister::fs_store::FilesystemStore; + use lightning_transaction_sync::EsploraSyncClient; use bdk::bitcoin::secp256k1::Secp256k1; @@ -48,6 +52,8 @@ use std::convert::TryInto; use std::default::Default; use std::fmt; use std::fs; +use std::io::Cursor; +use std::path::PathBuf; use std::sync::{Arc, Mutex, RwLock}; use std::time::SystemTime; @@ -80,12 +86,16 @@ pub enum BuildError { InvalidSeedFile, /// The current system time is invalid, clocks might have gone backwards. InvalidSystemTime, + /// The a read channel monitor is invalid. + InvalidChannelMonitor, /// We failed to read data from the [`KVStore`]. ReadFailed, /// We failed to write data to the [`KVStore`]. WriteFailed, /// We failed to access the given `storage_dir_path`. StoragePathAccessFailed, + /// We failed to setup our [`KVStore`]. + KVStoreSetupFailed, /// We failed to setup the onchain wallet. WalletSetupFailed, /// We failed to setup the logger. @@ -100,9 +110,13 @@ impl fmt::Display for BuildError { Self::InvalidSystemTime => { write!(f, "System time is invalid. Clocks might have gone back in time.") } + Self::InvalidChannelMonitor => { + write!(f, "Failed to watch a deserialzed ChannelMonitor") + } Self::ReadFailed => write!(f, "Failed to read from store."), Self::WriteFailed => write!(f, "Failed to write to store."), Self::StoragePathAccessFailed => write!(f, "Failed to access the given storage path."), + Self::KVStoreSetupFailed => write!(f, "Failed to setup KVStore."), Self::WalletSetupFailed => write!(f, "Failed to setup onchain wallet."), Self::LoggerSetupFailed => write!(f, "Failed to setup the logger."), } @@ -155,8 +169,6 @@ impl NodeBuilder { } /// Configures the [`Node`] instance to source its wallet entropy from the given 64 seed bytes. - /// - /// **Note:** Panics if the length of the given `seed_bytes` differs from 64. pub fn set_entropy_seed_bytes(&mut self, seed_bytes: Vec) -> Result<&mut Self, BuildError> { if seed_bytes.len() != WALLET_KEYS_SEED_LEN { return Err(BuildError::InvalidSeedBytes); @@ -217,7 +229,7 @@ impl NodeBuilder { } /// Sets the IP address and TCP port on which [`Node`] will listen for incoming network connections. - pub fn set_listening_address(&mut self, listening_address: NetAddress) -> &mut Self { + pub fn set_listening_address(&mut self, listening_address: SocketAddress) -> &mut Self { self.config.listening_address = Some(listening_address); self } @@ -234,17 +246,26 @@ impl NodeBuilder { let storage_dir_path = self.config.storage_dir_path.clone(); fs::create_dir_all(storage_dir_path.clone()) .map_err(|_| BuildError::StoragePathAccessFailed)?; - let kv_store = Arc::new(SqliteStore::new(storage_dir_path.into())); + let kv_store = Arc::new( + SqliteStore::new( + storage_dir_path.into(), + Some(io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|_| BuildError::KVStoreSetupFailed)?, + ); self.build_with_store(kv_store) } /// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options /// previously configured. pub fn build_with_fs_store(&self) -> Result, BuildError> { - let storage_dir_path = self.config.storage_dir_path.clone(); + let mut storage_dir_path: PathBuf = self.config.storage_dir_path.clone().into(); + storage_dir_path.push("fs_store"); + fs::create_dir_all(storage_dir_path.clone()) .map_err(|_| BuildError::StoragePathAccessFailed)?; - let kv_store = Arc::new(FilesystemStore::new(storage_dir_path.into())); + let kv_store = Arc::new(FilesystemStore::new(storage_dir_path)); self.build_with_store(kv_store) } @@ -353,7 +374,7 @@ impl ArcedNodeBuilder { } /// Sets the IP address and TCP port on which [`Node`] will listen for incoming network connections. - pub fn set_listening_address(&self, listening_address: NetAddress) { + pub fn set_listening_address(&self, listening_address: SocketAddress) { self.inner.write().unwrap().set_listening_address(listening_address); } @@ -510,7 +531,7 @@ fn build_with_store_internal( )); // Read ChannelMonitor state from store - let mut channel_monitors = match io::utils::read_channel_monitors( + let mut channel_monitors = match read_channel_monitors( Arc::clone(&kv_store), Arc::clone(&keys_manager), Arc::clone(&keys_manager), @@ -536,9 +557,12 @@ fn build_with_store_internal( user_config.manually_accept_inbound_channels = true; } let channel_manager = { - if let Ok(mut reader) = - kv_store.read(CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY) - { + if let Ok(res) = kv_store.read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) { + let mut reader = Cursor::new(res); let channel_monitor_references = channel_monitors.iter_mut().map(|(_, chanmon)| chanmon).collect(); let read_args = ChannelManagerReadArgs::new( @@ -589,7 +613,10 @@ fn build_with_store_internal( // Give ChannelMonitors to ChainMonitor for (_blockhash, channel_monitor) in channel_monitors.into_iter() { let funding_outpoint = channel_monitor.get_funding_txo().0; - chain_monitor.watch_channel(funding_outpoint, channel_monitor); + chain_monitor.watch_channel(funding_outpoint, channel_monitor).map_err(|e| { + log_error!(logger, "Failed to watch channel monitor: {:?}", e); + BuildError::InvalidChannelMonitor + })?; } // Initialize the PeerManager @@ -726,7 +753,7 @@ fn build_with_store_internal( gossip_source, kv_store, logger, - router, + _router: router, scorer, peer_store, payment_store, diff --git a/src/error.rs b/src/error.rs index 54ad0cdfd..b924fcda3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -38,7 +38,7 @@ pub enum Error { /// The given address is invalid. InvalidAddress, /// The given network address is invalid. - InvalidNetAddress, + InvalidSocketAddress, /// The given public key is invalid. InvalidPublicKey, /// The given secret key is invalid. @@ -85,7 +85,7 @@ impl fmt::Display for Error { Self::TxSyncFailed => write!(f, "Failed to sync transactions."), Self::GossipUpdateFailed => write!(f, "Failed to update gossip data."), Self::InvalidAddress => write!(f, "The given address is invalid."), - Self::InvalidNetAddress => write!(f, "The given network address is invalid."), + Self::InvalidSocketAddress => write!(f, "The given network address is invalid."), Self::InvalidPublicKey => write!(f, "The given public key is invalid."), Self::InvalidSecretKey => write!(f, "The given secret key is invalid."), Self::InvalidPaymentHash => write!(f, "The given payment hash is invalid."), diff --git a/src/event.rs b/src/event.rs index 31a115da5..6bc052f9b 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,22 +1,25 @@ use crate::{ - hex_utils, ChannelId, ChannelManager, Config, Error, KeysManager, NetworkGraph, UserChannelId, - Wallet, + hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, UserChannelId, Wallet, }; use crate::payment_store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentStatus, PaymentStore, }; -use crate::io::{KVStore, EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_NAMESPACE}; +use crate::io::{ + EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, +}; use crate::logger::{log_debug, log_error, log_info, Logger}; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::events::Event as LdkEvent; use lightning::events::PaymentPurpose; use lightning::impl_writeable_tlv_based_enum; -use lightning::ln::PaymentHash; +use lightning::ln::{ChannelId, PaymentHash}; use lightning::routing::gossip::NodeId; use lightning::util::errors::APIError; +use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use bitcoin::secp256k1::{PublicKey, Secp256k1}; @@ -68,6 +71,10 @@ pub enum Event { channel_id: ChannelId, /// The `user_channel_id` of the channel. user_channel_id: UserChannelId, + /// The `node_id` of the channel counterparty. + /// + /// This will be `None` for events serialized by LDK Node XXX TODO and prior. + counterparty_node_id: Option, }, /// A channel has been closed. ChannelClosed { @@ -75,6 +82,10 @@ pub enum Event { channel_id: ChannelId, /// The `user_channel_id` of the channel. user_channel_id: UserChannelId, + /// The `node_id` of the channel counterparty. + /// + /// This will be `None` for events serialized by LDK Node XXX TODO and prior. + counterparty_node_id: Option, }, } @@ -91,6 +102,7 @@ impl_writeable_tlv_based_enum!(Event, }, (3, ChannelReady) => { (0, channel_id, required), + (1, counterparty_node_id, option), (2, user_channel_id, required), }, (4, ChannelPending) => { @@ -102,6 +114,7 @@ impl_writeable_tlv_based_enum!(Event, }, (5, ChannelClosed) => { (0, channel_id, required), + (1, counterparty_node_id, option), (2, user_channel_id, required), }; ); @@ -161,12 +174,18 @@ where fn persist_queue(&self, locked_queue: &VecDeque) -> Result<(), Error> { let data = EventQueueSerWrapper(locked_queue).encode(); self.kv_store - .write(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY, &data) + .write( + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + &data, + ) .map_err(|e| { log_error!( self.logger, - "Write for key {}/{} failed due to: {}", - EVENT_QUEUE_PERSISTENCE_NAMESPACE, + "Write for key {}/{}/{} failed due to: {}", + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY, e ); @@ -402,6 +421,8 @@ where purpose, amount_msat, receiver_node_id: _, + htlcs: _, + sender_intended_total_msat: _, } => { log_info!( self.logger, @@ -552,7 +573,7 @@ where }); } } - LdkEvent::SpendableOutputs { outputs } => { + LdkEvent::SpendableOutputs { outputs, channel_id: _ } => { // TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so. let destination_address = self.wallet.get_new_address().unwrap_or_else(|e| { log_error!(self.logger, "Failed to get destination address: {}", e); @@ -643,7 +664,7 @@ where let nodes = read_only_network_graph.nodes(); let channels = self.channel_manager.list_channels(); - let node_str = |channel_id: &Option<[u8; 32]>| { + let node_str = |channel_id: &Option| { channel_id .and_then(|channel_id| channels.iter().find(|c| c.channel_id == channel_id)) .and_then(|channel| { @@ -657,11 +678,9 @@ where }) }) }; - let channel_str = |channel_id: &Option<[u8; 32]>| { + let channel_str = |channel_id: &Option| { channel_id - .map(|channel_id| { - format!(" with channel {}", hex_utils::to_string(&channel_id)) - }) + .map(|channel_id| format!(" with channel {}", channel_id)) .unwrap_or_default() }; let from_prev_str = format!( @@ -704,16 +723,14 @@ where log_info!( self.logger, "New channel {} with counterparty {} has been created and is pending confirmation on chain.", - hex_utils::to_string(&channel_id), + channel_id, counterparty_node_id, ); self.event_queue .add_event(Event::ChannelPending { - channel_id: ChannelId(channel_id), + channel_id, user_channel_id: UserChannelId(user_channel_id), - former_temporary_channel_id: ChannelId( - former_temporary_channel_id.unwrap(), - ), + former_temporary_channel_id: former_temporary_channel_id.unwrap(), counterparty_node_id, funding_txo, }) @@ -728,30 +745,33 @@ where log_info!( self.logger, "Channel {} with counterparty {} ready to be used.", - hex_utils::to_string(&channel_id), + channel_id, counterparty_node_id, ); self.event_queue .add_event(Event::ChannelReady { - channel_id: ChannelId(channel_id), + channel_id, user_channel_id: UserChannelId(user_channel_id), + counterparty_node_id: Some(counterparty_node_id), }) .unwrap_or_else(|e| { log_error!(self.logger, "Failed to push to event queue: {}", e); panic!("Failed to push to event queue"); }); } - LdkEvent::ChannelClosed { channel_id, reason, user_channel_id } => { - log_info!( - self.logger, - "Channel {} closed due to: {:?}", - hex_utils::to_string(&channel_id), - reason - ); + LdkEvent::ChannelClosed { + channel_id, + reason, + user_channel_id, + counterparty_node_id, + .. + } => { + log_info!(self.logger, "Channel {} closed due to: {:?}", channel_id, reason); self.event_queue .add_event(Event::ChannelClosed { - channel_id: ChannelId(channel_id), + channel_id, user_channel_id: UserChannelId(user_channel_id), + counterparty_node_id, }) .unwrap_or_else(|e| { log_error!(self.logger, "Failed to push to event queue: {}", e); @@ -768,11 +788,12 @@ where #[cfg(test)] mod tests { use super::*; - use crate::test::utils::{TestLogger, TestStore}; + use crate::test::utils::TestLogger; + use lightning::util::test_utils::TestStore; #[test] fn event_queue_persistence() { - let store = Arc::new(TestStore::new()); + let store = Arc::new(TestStore::new(false)); let logger = Arc::new(TestLogger::new()); let event_queue = EventQueue::new(Arc::clone(&store), Arc::clone(&logger)); assert_eq!(event_queue.next_event(), None); @@ -780,29 +801,29 @@ mod tests { let expected_event = Event::ChannelReady { channel_id: ChannelId([23u8; 32]), user_channel_id: UserChannelId(2323), + counterparty_node_id: None, }; event_queue.add_event(expected_event.clone()).unwrap(); - assert!(store.get_and_clear_did_persist()); // Check we get the expected event and that it is returned until we mark it handled. for _ in 0..5 { assert_eq!(event_queue.wait_next_event(), expected_event); assert_eq!(event_queue.next_event(), Some(expected_event.clone())); - assert_eq!(false, store.get_and_clear_did_persist()); } // Check we can read back what we persisted. let persisted_bytes = store - .get_persisted_bytes(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY) + .read( + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + ) .unwrap(); let deser_event_queue = EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap(); assert_eq!(deser_event_queue.wait_next_event(), expected_event); - assert!(!store.get_and_clear_did_persist()); - // Check we persisted on `event_handled()` event_queue.event_handled().unwrap(); - - assert!(store.get_and_clear_did_persist()); + assert_eq!(event_queue.next_event(), None); } } diff --git a/src/io/fs_store.rs b/src/io/fs_store.rs deleted file mode 100644 index 4eb067bf1..000000000 --- a/src/io/fs_store.rs +++ /dev/null @@ -1,272 +0,0 @@ -#[cfg(target_os = "windows")] -extern crate winapi; - -use super::*; - -use std::collections::HashMap; -use std::fs; -use std::io::{BufReader, Read, Write}; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, RwLock}; - -#[cfg(not(target_os = "windows"))] -use std::os::unix::io::AsRawFd; - -use lightning::util::persist::KVStorePersister; -use lightning::util::ser::Writeable; - -use rand::distributions::Alphanumeric; -use rand::{thread_rng, Rng}; - -#[cfg(target_os = "windows")] -use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; - -#[cfg(target_os = "windows")] -macro_rules! call { - ($e: expr) => { - if $e != 0 { - return Ok(()); - } else { - return Err(std::io::Error::last_os_error()); - } - }; -} - -#[cfg(target_os = "windows")] -fn path_to_windows_str>(path: T) -> Vec { - path.as_ref().encode_wide().chain(Some(0)).collect() -} - -/// A [`KVStore`] implementation that writes to and reads from the file system. -pub struct FilesystemStore { - dest_dir: PathBuf, - locks: Mutex>>>, -} - -impl FilesystemStore { - pub(crate) fn new(mut dest_dir: PathBuf) -> Self { - dest_dir.push("fs_store"); - let locks = Mutex::new(HashMap::new()); - Self { dest_dir, locks } - } -} - -impl KVStore for FilesystemStore { - type Reader = FilesystemReader; - - fn read(&self, namespace: &str, key: &str) -> std::io::Result { - let mut outer_lock = self.locks.lock().unwrap(); - let lock_key = (namespace.to_string(), key.to_string()); - let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default()); - - let mut dest_file_path = self.dest_dir.clone(); - dest_file_path.push(namespace); - dest_file_path.push(key); - FilesystemReader::new(dest_file_path, inner_lock_ref) - } - - fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { - let mut outer_lock = self.locks.lock().unwrap(); - let lock_key = (namespace.to_string(), key.to_string()); - let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default()); - let _guard = inner_lock_ref.write().unwrap(); - - let mut dest_file_path = self.dest_dir.clone(); - dest_file_path.push(namespace); - dest_file_path.push(key); - - let parent_directory = dest_file_path - .parent() - .ok_or_else(|| { - let msg = - format!("Could not retrieve parent directory of {}.", dest_file_path.display()); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })? - .to_path_buf(); - fs::create_dir_all(parent_directory.clone())?; - - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let mut tmp_file_path = dest_file_path.clone(); - let mut rng = thread_rng(); - let rand_str: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); - let ext = format!("{}.tmp", rand_str); - tmp_file_path.set_extension(ext); - - let mut tmp_file = fs::File::create(&tmp_file_path)?; - tmp_file.write_all(&buf)?; - tmp_file.sync_all()?; - - #[cfg(not(target_os = "windows"))] - { - fs::rename(&tmp_file_path, &dest_file_path)?; - let dir_file = fs::OpenOptions::new().read(true).open(parent_directory.clone())?; - unsafe { - libc::fsync(dir_file.as_raw_fd()); - } - } - - #[cfg(target_os = "windows")] - { - if dest_file_path.exists() { - unsafe { - winapi::um::winbase::ReplaceFileW( - path_to_windows_str(dest_file_path).as_ptr(), - path_to_windows_str(tmp_file_path).as_ptr(), - std::ptr::null(), - winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS, - std::ptr::null_mut() as *mut winapi::ctypes::c_void, - std::ptr::null_mut() as *mut winapi::ctypes::c_void, - ) - }; - } else { - call!(unsafe { - winapi::um::winbase::MoveFileExW( - path_to_windows_str(tmp_file_path).as_ptr(), - path_to_windows_str(dest_file_path).as_ptr(), - winapi::um::winbase::MOVEFILE_WRITE_THROUGH - | winapi::um::winbase::MOVEFILE_REPLACE_EXISTING, - ) - }); - } - } - Ok(()) - } - - fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> { - let mut outer_lock = self.locks.lock().unwrap(); - let lock_key = (namespace.to_string(), key.to_string()); - let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default()); - - let _guard = inner_lock_ref.write().unwrap(); - - let mut dest_file_path = self.dest_dir.clone(); - dest_file_path.push(namespace); - dest_file_path.push(key); - - if !dest_file_path.is_file() { - return Ok(()); - } - - fs::remove_file(&dest_file_path)?; - #[cfg(not(target_os = "windows"))] - { - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = - format!("Could not retrieve parent directory of {}.", dest_file_path.display()); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; - unsafe { - // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes - // to the inode might get cached (and hence possibly lost on crash), depending on - // the target platform and file system. - // - // In order to assert we permanently removed the file in question we therefore - // call `fsync` on the parent directory on platforms that support it, - libc::fsync(dir_file.as_raw_fd()); - } - } - - if dest_file_path.is_file() { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed")); - } - - if Arc::strong_count(&inner_lock_ref) == 2 { - // It's safe to remove the lock entry if we're the only one left holding a strong - // reference. Checking this is necessary to ensure we continue to distribute references to the - // same lock as long as some Readers are around. However, we still want to - // clean up the table when possible. - // - // Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are - // around, but is preferable to doing nothing *or* something overly complex such as - // implementing yet another RAII structure just for this pupose. - outer_lock.remove(&lock_key); - } - - // Garbage collect all lock entries that are not referenced anymore. - outer_lock.retain(|_, v| Arc::strong_count(&v) > 1); - - Ok(()) - } - - fn list(&self, namespace: &str) -> std::io::Result> { - let mut prefixed_dest = self.dest_dir.clone(); - prefixed_dest.push(namespace); - - let mut keys = Vec::new(); - - if !Path::new(&prefixed_dest).exists() { - return Ok(Vec::new()); - } - - for entry in fs::read_dir(prefixed_dest.clone())? { - let entry = entry?; - let p = entry.path(); - - if !p.is_file() { - continue; - } - - if let Some(ext) = p.extension() { - if ext == "tmp" { - continue; - } - } - - if let Ok(relative_path) = p.strip_prefix(prefixed_dest.clone()) { - keys.push(relative_path.display().to_string()) - } - } - - Ok(keys) - } -} - -pub struct FilesystemReader { - inner: BufReader, - lock_ref: Arc>, -} - -impl FilesystemReader { - fn new(dest_file_path: PathBuf, lock_ref: Arc>) -> std::io::Result { - let f = fs::File::open(dest_file_path.clone())?; - let inner = BufReader::new(f); - Ok(Self { inner, lock_ref }) - } -} - -impl Read for FilesystemReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let _guard = self.lock_ref.read().unwrap(); - self.inner.read(buf) - } -} - -impl KVStorePersister for FilesystemStore { - fn persist(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> { - let (namespace, key) = get_namespace_and_key_from_prefixed(prefixed_key)?; - self.write(&namespace, &key, &object.encode()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test::utils::random_storage_path; - - use proptest::prelude::*; - proptest! { - #[test] - fn read_write_remove_list_persist(data in any::<[u8; 32]>()) { - let rand_dir = random_storage_path(); - - let fs_store = FilesystemStore::new(rand_dir.into()); - - do_read_write_remove_list_persist(&data, &fs_store); - } - } -} diff --git a/src/io/mod.rs b/src/io/mod.rs index 37c3a2d0f..fbed6cfbb 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,135 +1,30 @@ //! Objects and traits for data persistence. -pub(crate) mod fs_store; -pub(crate) mod sqlite_store; +pub mod sqlite_store; +#[cfg(test)] +pub(crate) mod test_utils; pub(crate) mod utils; -pub use fs_store::FilesystemStore; -pub use sqlite_store::SqliteStore; - -use lightning::util::persist::KVStorePersister; - -use std::io::Read; -use std::path::PathBuf; -use std::str::FromStr; - -// The namespacs and keys LDK uses for persisting -pub(crate) const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE: &str = ""; -pub(crate) const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager"; - -pub(crate) const CHANNEL_MONITOR_PERSISTENCE_NAMESPACE: &str = "monitors"; - -pub(crate) const NETWORK_GRAPH_PERSISTENCE_NAMESPACE: &str = ""; -pub(crate) const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph"; - -pub(crate) const SCORER_PERSISTENCE_NAMESPACE: &str = ""; -pub(crate) const SCORER_PERSISTENCE_KEY: &str = "scorer"; - /// The event queue will be persisted under this key. -pub(crate) const EVENT_QUEUE_PERSISTENCE_NAMESPACE: &str = ""; +pub(crate) const EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; +pub(crate) const EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; pub(crate) const EVENT_QUEUE_PERSISTENCE_KEY: &str = "events"; /// The peer information will be persisted under this key. -pub(crate) const PEER_INFO_PERSISTENCE_NAMESPACE: &str = ""; +pub(crate) const PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; +pub(crate) const PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; /// The payment information will be persisted under this prefix. -pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments"; +pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; +pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key. -pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = ""; +pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = ""; +pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE: &str = ""; pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp"; /// The last time we broadcast a node announcement will be persisted under this key. -pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_NAMESPACE: &str = ""; +pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_PRIMARY_NAMESPACE: &str = ""; +pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_SECONDARY_NAMESPACE: &str = ""; pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY: &str = "latest_node_ann_bcast_timestamp"; - -/// Provides an interface that allows to store and retrieve persisted values that are associated -/// with given keys. -/// -/// In order to avoid collisions the key space is segmented based on the given `namespace`s. -/// Implementations of this trait are free to handle them in different ways, as long as -/// per-namespace key uniqueness is asserted. -/// -/// Keys and namespaces are required to be valid ASCII strings and the empty namespace (`""`) is -/// assumed to be valid namespace. -pub trait KVStore: KVStorePersister { - /// A reader as returned by [`Self::read`]. - type Reader: Read; - /// Returns a [`Read`] for the given `namespace` and `key` from which [`Readable`]s may be - /// read. - /// - /// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given `namespace`. - /// - /// [`Readable`]: lightning::util::ser::Readable - /// [`ErrorKind::NotFound`]: std::io::ErrorKind::NotFound - fn read(&self, namespace: &str, key: &str) -> std::io::Result; - /// Persists the given data under the given `key`. - /// - /// Will create the given `namespace` if not already present in the store. - fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()>; - /// Removes any data that had previously been persisted under the given `key`. - fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()>; - /// Returns a list of keys that are stored under the given `namespace`. - /// - /// Will return an empty list if the `namespace` is unknown. - fn list(&self, namespace: &str) -> std::io::Result>; -} - -fn get_namespace_and_key_from_prefixed( - prefixed_key: &str, -) -> lightning::io::Result<(String, String)> { - let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| { - let msg = format!("Could not persist file for key {}.", prefixed_key); - lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg) - })?; - - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = format!("Could not persist file for key {}.", prefixed_key); - lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg) - })?; - let namespace = parent_directory.display().to_string(); - - let dest_without_namespace = dest_file_path.strip_prefix(&namespace).map_err(|_| { - let msg = format!("Could not persist file for key {}.", prefixed_key); - lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg) - })?; - let key = dest_without_namespace.display().to_string(); - - Ok((namespace, key)) -} - -#[cfg(test)] -fn do_read_write_remove_list_persist(data: &[u8; 32], kv_store: &K) { - use lightning::util::ser::Readable; - - let namespace = "testspace"; - let key = "testkey"; - - // Test the basic KVStore operations. - kv_store.write(namespace, key, data).unwrap(); - - // Test empty namespace is allowed, but not empty key. - kv_store.write("", key, data).unwrap(); - assert!(kv_store.write(namespace, "", data).is_err()); - - let listed_keys = kv_store.list(namespace).unwrap(); - assert_eq!(listed_keys.len(), 1); - assert_eq!(listed_keys[0], key); - - let mut reader = kv_store.read(namespace, key).unwrap(); - let read_data: [u8; 32] = Readable::read(&mut reader).unwrap(); - assert_eq!(*data, read_data); - - kv_store.remove(namespace, key).unwrap(); - - let listed_keys = kv_store.list(namespace).unwrap(); - assert_eq!(listed_keys.len(), 0); - - // Test KVStorePersister - let prefixed_key = format!("{}/{}", namespace, key); - kv_store.persist(&prefixed_key, &data).unwrap(); - let mut reader = kv_store.read(namespace, key).unwrap(); - let read_data: [u8; 32] = Readable::read(&mut reader).unwrap(); - assert_eq!(*data, read_data); -} diff --git a/src/io/sqlite_store.rs b/src/io/sqlite_store.rs deleted file mode 100644 index 0c71d9e51..000000000 --- a/src/io/sqlite_store.rs +++ /dev/null @@ -1,189 +0,0 @@ -use super::*; - -use lightning::util::persist::KVStorePersister; -use lightning::util::ser::Writeable; - -use rusqlite::{named_params, Connection}; - -use std::fs; -use std::io::Cursor; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; - -// The database file name. -const SQLITE_DB_FILE: &str = "ldk_node.sqlite"; - -// The table in which we store all data. -const KV_TABLE_NAME: &str = "ldk_node_data"; - -// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration. -const SCHEMA_USER_VERSION: u16 = 1; - -/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database. -/// -/// [SQLite]: https://sqlite.org -pub struct SqliteStore { - connection: Arc>, -} - -impl SqliteStore { - pub(crate) fn new(dest_dir: PathBuf) -> Self { - fs::create_dir_all(dest_dir.clone()).unwrap_or_else(|_| { - panic!("Failed to create database destination directory: {}", dest_dir.display()) - }); - let mut db_file_path = dest_dir.clone(); - db_file_path.push(SQLITE_DB_FILE); - - let connection = Connection::open(db_file_path.clone()).unwrap_or_else(|_| { - panic!("Failed to open/create database file: {}", db_file_path.display()) - }); - - connection - .pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| { - Ok(()) - }) - .unwrap_or_else(|_| panic!("Failed to set PRAGMA user_version")); - - let sql = format!( - "CREATE TABLE IF NOT EXISTS {} ( - namespace TEXT NOT NULL, - key TEXT NOT NULL CHECK (key <> ''), - value BLOB, PRIMARY KEY ( namespace, key ) - );", - KV_TABLE_NAME - ); - connection - .execute(&sql, []) - .unwrap_or_else(|_| panic!("Failed to create table: {}", KV_TABLE_NAME)); - - let connection = Arc::new(Mutex::new(connection)); - Self { connection } - } -} - -impl KVStore for SqliteStore { - type Reader = Cursor>; - - fn read(&self, namespace: &str, key: &str) -> std::io::Result { - let locked_conn = self.connection.lock().unwrap(); - let sql = - format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME); - - let res = locked_conn - .query_row( - &sql, - named_params! { - ":namespace": namespace, - ":key": key, - }, - |row| row.get(0), - ) - .map_err(|e| match e { - rusqlite::Error::QueryReturnedNoRows => { - let msg = - format!("Failed to read as key could not be found: {}/{}", namespace, key); - std::io::Error::new(std::io::ErrorKind::NotFound, msg) - } - e => { - let msg = format!("Failed to read from key {}/{}: {}", namespace, key, e); - std::io::Error::new(std::io::ErrorKind::Other, msg) - } - })?; - Ok(Cursor::new(res)) - } - - fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { - let locked_conn = self.connection.lock().unwrap(); - - let sql = format!( - "INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);", - KV_TABLE_NAME - ); - - locked_conn - .execute( - &sql, - named_params! { - ":namespace": namespace, - ":key": key, - ":value": buf, - }, - ) - .map(|_| ()) - .map_err(|e| { - let msg = format!("Failed to write to key {}/{}: {}", namespace, key, e); - std::io::Error::new(std::io::ErrorKind::Other, msg) - }) - } - - fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> { - let locked_conn = self.connection.lock().unwrap(); - - let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME); - locked_conn - .execute( - &sql, - named_params! { - ":namespace": namespace, - ":key": key, - }, - ) - .map_err(|e| { - let msg = format!("Failed to delete key {}/{}: {}", namespace, key, e); - std::io::Error::new(std::io::ErrorKind::Other, msg) - })?; - Ok(()) - } - - fn list(&self, namespace: &str) -> std::io::Result> { - let locked_conn = self.connection.lock().unwrap(); - - let sql = format!("SELECT key FROM {} WHERE namespace=:namespace", KV_TABLE_NAME); - let mut stmt = locked_conn.prepare(&sql).map_err(|e| { - let msg = format!("Failed to prepare statement: {}", e); - std::io::Error::new(std::io::ErrorKind::Other, msg) - })?; - - let mut keys = Vec::new(); - - let rows_iter = stmt - .query_map(named_params! {":namespace": namespace, }, |row| row.get(0)) - .map_err(|e| { - let msg = format!("Failed to retrieve queried rows: {}", e); - std::io::Error::new(std::io::ErrorKind::Other, msg) - })?; - - for k in rows_iter { - keys.push(k.map_err(|e| { - let msg = format!("Failed to retrieve queried rows: {}", e); - std::io::Error::new(std::io::ErrorKind::Other, msg) - })?); - } - - Ok(keys) - } -} - -impl KVStorePersister for SqliteStore { - fn persist(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> { - let (namespace, key) = get_namespace_and_key_from_prefixed(prefixed_key)?; - self.write(&namespace, &key, &object.encode()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test::utils::random_storage_path; - - use proptest::prelude::*; - proptest! { - #[test] - fn read_write_remove_list_persist(data in any::<[u8; 32]>()) { - let rand_dir = random_storage_path(); - let sqlite_store = SqliteStore::new(rand_dir.into()); - - do_read_write_remove_list_persist(&data, &sqlite_store); - } - } -} diff --git a/src/io/sqlite_store/migrations.rs b/src/io/sqlite_store/migrations.rs new file mode 100644 index 000000000..bdf1baf9a --- /dev/null +++ b/src/io/sqlite_store/migrations.rs @@ -0,0 +1,165 @@ +use rusqlite::Connection; + +use lightning::io; + +pub(super) fn migrate_schema( + connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16, +) -> io::Result<()> { + assert!(from_version < to_version); + if from_version == 1 && to_version == 2 { + let tx = connection.transaction().map_err(|e| { + let msg = format!( + "Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Rename 'namespace' column to 'primary_namespace' + let sql = format!( + "ALTER TABLE {} + RENAME COLUMN namespace TO primary_namespace;", + kv_table_name + ); + + tx.execute(&sql, []).map_err(|e| { + let msg = format!( + "Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Add new 'secondary_namespace' column + let sql = format!( + "ALTER TABLE {} + ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;", + kv_table_name + ); + + tx.execute(&sql, []).map_err(|e| { + let msg = format!( + "Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Update user_version + tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", to_version, |_| Ok(())) + .map_err(|e| { + let msg = format!( + "Failed to upgrade user_version from {} to {}: {}", + from_version, to_version, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + tx.commit().map_err(|e| { + let msg = format!( + "Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use crate::io::sqlite_store::SqliteStore; + use crate::io::test_utils::do_read_write_remove_list_persist; + use crate::test::utils::random_storage_path; + + use lightning::util::persist::KVStore; + + use rusqlite::{named_params, Connection}; + + use std::fs; + + #[test] + fn rwrl_post_schema_1_migration() { + let old_schema_version = 1; + + let mut temp_path = random_storage_path(); + temp_path.push("rwrl_post_schema_1_migration"); + + let db_file_name = "test_db".to_string(); + let kv_table_name = "test_table".to_string(); + + let test_namespace = "testspace".to_string(); + let test_key = "testkey".to_string(); + let test_data = [42u8; 32]; + + { + // We create a database with a SCHEMA_VERSION 1 table + fs::create_dir_all(temp_path.clone()).unwrap(); + let mut db_file_path = temp_path.clone(); + db_file_path.push(db_file_name.clone()); + + let connection = Connection::open(db_file_path.clone()).unwrap(); + + connection + .pragma( + Some(rusqlite::DatabaseName::Main), + "user_version", + old_schema_version, + |_| Ok(()), + ) + .unwrap(); + + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + namespace TEXT NOT NULL, + key TEXT NOT NULL CHECK (key <> ''), + value BLOB, PRIMARY KEY ( namespace, key ) + );", + kv_table_name + ); + + connection.execute(&sql, []).unwrap(); + + // We write some data to to the table + let sql = format!( + "INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);", + kv_table_name + ); + let mut stmt = connection.prepare_cached(&sql).unwrap(); + + stmt.execute(named_params! { + ":namespace": test_namespace, + ":key": test_key, + ":value": test_data, + }) + .unwrap(); + + // We read the just written data back to assert it happened. + let sql = format!( + "SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", + kv_table_name + ); + let mut stmt = connection.prepare_cached(&sql).unwrap(); + + let res: Vec = stmt + .query_row( + named_params! { + ":namespace": test_namespace, + ":key": test_key, + }, + |row| row.get(0), + ) + .unwrap(); + + assert_eq!(res, test_data); + } + + // Check we migrate the db just fine without losing our written data. + let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap(); + let res = store.read(&test_namespace, "", &test_key).unwrap(); + assert_eq!(res, test_data); + + // Check we can continue to use the store just fine. + do_read_write_remove_list_persist(&store); + } +} diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs new file mode 100644 index 000000000..605c8b77a --- /dev/null +++ b/src/io/sqlite_store/mod.rs @@ -0,0 +1,348 @@ +//! Objects related to [`SqliteStore`] live here. +use crate::io::utils::check_namespace_key_validity; + +use lightning::io; +use lightning::util::persist::KVStore; +use lightning::util::string::PrintableString; + +use rusqlite::{named_params, Connection}; + +use std::fs; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +mod migrations; + +/// LDK Node's database file name. +pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite"; +/// LDK Node's table in which we store all data. +pub const KV_TABLE_NAME: &str = "ldk_node_data"; + +/// The default database file name. +pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite"; + +/// The default table in which we store all data. +pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data"; + +// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration. +const SCHEMA_USER_VERSION: u16 = 2; + +/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database. +/// +/// [SQLite]: https://sqlite.org +pub struct SqliteStore { + connection: Arc>, + data_dir: PathBuf, + kv_table_name: String, +} + +impl SqliteStore { + /// Constructs a new [`SqliteStore`]. + /// + /// If not already existing, a new SQLite database will be created in the given `data_dir` under the + /// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`). + /// + /// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`]. + pub fn new( + data_dir: PathBuf, db_file_name: Option, kv_table_name: Option, + ) -> io::Result { + let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string()); + let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string()); + + fs::create_dir_all(data_dir.clone()).map_err(|e| { + let msg = format!( + "Failed to create database destination directory {}: {}", + data_dir.display(), + e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + let mut db_file_path = data_dir.clone(); + db_file_path.push(db_file_name); + + let mut connection = Connection::open(db_file_path.clone()).map_err(|e| { + let msg = + format!("Failed to open/create database file {}: {}", db_file_path.display(), e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let sql = format!("SELECT user_version FROM pragma_user_version"); + let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap(); + + if version_res == 0 { + // New database, set our SCHEMA_USER_VERSION and continue + connection + .pragma( + Some(rusqlite::DatabaseName::Main), + "user_version", + SCHEMA_USER_VERSION, + |_| Ok(()), + ) + .map_err(|e| { + let msg = format!("Failed to set PRAGMA user_version: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + } else if version_res < SCHEMA_USER_VERSION { + migrations::migrate_schema( + &mut connection, + &kv_table_name, + version_res, + SCHEMA_USER_VERSION, + )?; + } else if version_res > SCHEMA_USER_VERSION { + let msg = format!( + "Failed to open database: incompatible schema version {}. Expected: {}", + version_res, SCHEMA_USER_VERSION + ); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + primary_namespace TEXT NOT NULL, + secondary_namespace TEXT DEFAULT \"\" NOT NULL, + key TEXT NOT NULL CHECK (key <> ''), + value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key ) + );", + kv_table_name + ); + + connection.execute(&sql, []).map_err(|e| { + let msg = format!("Failed to create table {}: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let connection = Arc::new(Mutex::new(connection)); + Ok(Self { connection, data_dir, kv_table_name }) + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.data_dir.clone() + } +} + +impl KVStore for SqliteStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> std::io::Result> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + + let locked_conn = self.connection.lock().unwrap(); + let sql = + format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", + self.kv_table_name); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + let res = stmt + .query_row( + named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key": key, + }, + |row| row.get(0), + ) + .map_err(|e| match e { + rusqlite::Error::QueryReturnedNoRows => { + let msg = format!( + "Failed to read as key could not be found: {}/{}/{}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + std::io::Error::new(std::io::ErrorKind::NotFound, msg) + } + e => { + let msg = format!( + "Failed to read from key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); + std::io::Error::new(std::io::ErrorKind::Other, msg) + } + })?; + Ok(res) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> std::io::Result<()> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!( + "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);", + self.kv_table_name + ); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + stmt.execute(named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key": key, + ":value": buf, + }) + .map(|_| ()) + .map_err(|e| { + let msg = format!( + "Failed to write to key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); + std::io::Error::new(std::io::ErrorKind::Other, msg) + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> std::io::Result<()> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + stmt.execute(named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key": key, + }) + .map_err(|e| { + let msg = format!( + "Failed to delete key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + Ok(()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result> { + check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!( + "SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace", + self.kv_table_name + ); + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + let mut keys = Vec::new(); + + let rows_iter = stmt + .query_map( + named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + }, + |row| row.get(0), + ) + .map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + for k in rows_iter { + keys.push(k.map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?); + } + + Ok(keys) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::test_utils::{do_read_write_remove_list_persist, do_test_store}; + use crate::test::utils::random_storage_path; + + impl Drop for SqliteStore { + fn drop(&mut self) { + match fs::remove_dir_all(&self.data_dir) { + Err(e) => println!("Failed to remove test store directory: {}", e), + _ => {} + } + } + } + + #[test] + fn read_write_remove_list_persist() { + let mut temp_path = random_storage_path(); + temp_path.push("read_write_remove_list_persist"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + do_read_write_remove_list_persist(&store); + } + + #[test] + fn test_sqlite_store() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store"); + let store_0 = SqliteStore::new( + temp_path.clone(), + Some("test_db_0".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + let store_1 = SqliteStore::new( + temp_path, + Some("test_db_1".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + do_test_store(&store_0, &store_1) + } +} + +#[cfg(ldk_bench)] +/// Benches +pub mod bench { + use criterion::Criterion; + + /// Bench! + pub fn bench_sends(bench: &mut Criterion) { + let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap(); + let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap(); + lightning::ln::channelmanager::bench::bench_two_sends( + bench, + "bench_sqlite_persisted_sends", + store_a, + store_b, + ); + } +} diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs new file mode 100644 index 000000000..3abd75b66 --- /dev/null +++ b/src/io/test_utils.rs @@ -0,0 +1,320 @@ +use crate::io::sqlite_store::SqliteStore; +use lightning_persister::fs_store::FilesystemStore; + +use lightning::ln::functional_test_utils::{ + connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block, + create_network, create_node_cfgs, create_node_chanmgrs, send_payment, +}; +use lightning::util::persist::{read_channel_monitors, KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN}; + +use lightning::chain::channelmonitor::CLOSED_CHANNEL_UPDATE_ID; +use lightning::events::ClosureReason; +use lightning::util::test_utils::{self, TestStore}; +use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event}; + +use std::panic::RefUnwindSafe; +use std::path::PathBuf; +use std::sync::RwLock; + +pub(crate) fn do_read_write_remove_list_persist(kv_store: &K) { + let data = [42u8; 32]; + + let primary_namespace = "testspace"; + let secondary_namespace = "testsubspace"; + let key = "testkey"; + + // Test the basic KVStore operations. + kv_store.write(primary_namespace, secondary_namespace, key, &data).unwrap(); + + // Test empty primary/secondary namespaces are allowed, but not empty primary namespace and non-empty + // secondary primary_namespace, and not empty key. + kv_store.write("", "", key, &data).unwrap(); + let res = std::panic::catch_unwind(|| kv_store.write("", secondary_namespace, key, &data)); + assert!(res.is_err()); + let res = std::panic::catch_unwind(|| { + kv_store.write(primary_namespace, secondary_namespace, "", &data) + }); + assert!(res.is_err()); + + let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap(); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], key); + + let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap(); + assert_eq!(data, &*read_data); + + kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap(); + + let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap(); + assert_eq!(listed_keys.len(), 0); + + // Ensure we have no issue operating with primary_namespace/secondary_namespace/key being KVSTORE_NAMESPACE_KEY_MAX_LEN + let max_chars: String = std::iter::repeat('A').take(KVSTORE_NAMESPACE_KEY_MAX_LEN).collect(); + kv_store.write(&max_chars, &max_chars, &max_chars, &data).unwrap(); + + let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap(); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], max_chars); + + let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap(); + assert_eq!(data, &*read_data); + + kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap(); + + let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap(); + assert_eq!(listed_keys.len(), 0); +} + +// Integration-test the given KVStore implementation. Test relaying a few payments and check that +// the persisted data is updated the appropriate number of times. +pub(crate) fn do_test_store(store_0: &K, store_1: &K) { + let chanmon_cfgs = create_chanmon_cfgs(2); + let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let chain_mon_0 = test_utils::TestChainMonitor::new( + Some(&chanmon_cfgs[0].chain_source), + &chanmon_cfgs[0].tx_broadcaster, + &chanmon_cfgs[0].logger, + &chanmon_cfgs[0].fee_estimator, + store_0, + node_cfgs[0].keys_manager, + ); + let chain_mon_1 = test_utils::TestChainMonitor::new( + Some(&chanmon_cfgs[1].chain_source), + &chanmon_cfgs[1].tx_broadcaster, + &chanmon_cfgs[1].logger, + &chanmon_cfgs[1].fee_estimator, + store_1, + node_cfgs[1].keys_manager, + ); + node_cfgs[0].chain_monitor = chain_mon_0; + node_cfgs[1].chain_monitor = chain_mon_1; + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + // Check that the persisted channel data is empty before any channels are + // open. + let mut persisted_chan_data_0 = + read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap(); + assert_eq!(persisted_chan_data_0.len(), 0); + let mut persisted_chan_data_1 = + read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap(); + assert_eq!(persisted_chan_data_1.len(), 0); + + // Helper to make sure the channel is on the expected update ID. + macro_rules! check_persisted_data { + ($expected_update_id: expr) => { + persisted_chan_data_0 = + read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager) + .unwrap(); + assert_eq!(persisted_chan_data_0.len(), 1); + for (_, mon) in persisted_chan_data_0.iter() { + assert_eq!(mon.get_latest_update_id(), $expected_update_id); + } + persisted_chan_data_1 = + read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager) + .unwrap(); + assert_eq!(persisted_chan_data_1.len(), 1); + for (_, mon) in persisted_chan_data_1.iter() { + assert_eq!(mon.get_latest_update_id(), $expected_update_id); + } + }; + } + + // Create some initial channel and check that a channel was persisted. + let _ = create_announced_chan_between_nodes(&nodes, 0, 1); + check_persisted_data!(0); + + // Send a few payments and make sure the monitors are updated to the latest. + send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000); + check_persisted_data!(5); + send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000); + check_persisted_data!(10); + + // Force close because cooperative close doesn't result in any persisted + // updates. + nodes[0] + .node + .force_close_broadcasting_latest_txn( + &nodes[0].node.list_channels()[0].channel_id, + &nodes[1].node.get_our_node_id(), + ) + .unwrap(); + check_closed_event!( + nodes[0], + 1, + ClosureReason::HolderForceClosed, + [nodes[1].node.get_our_node_id()], + 100000 + ); + check_closed_broadcast!(nodes[0], true); + check_added_monitors!(nodes[0], 1); + + let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap(); + assert_eq!(node_txn.len(), 1); + + connect_block( + &nodes[1], + &create_dummy_block( + nodes[0].best_block_hash(), + 42, + vec![node_txn[0].clone(), node_txn[0].clone()], + ), + ); + check_closed_broadcast!(nodes[1], true); + check_closed_event!( + nodes[1], + 1, + ClosureReason::CommitmentTxConfirmed, + [nodes[0].node.get_our_node_id()], + 100000 + ); + check_added_monitors!(nodes[1], 1); + + // Make sure everything is persisted as expected after close. + check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID); +} + +// A `KVStore` impl for testing purposes that wraps all our `KVStore`s and asserts their synchronicity. +pub(crate) struct TestSyncStore { + serializer: RwLock<()>, + test_store: TestStore, + fs_store: FilesystemStore, + sqlite_store: SqliteStore, +} + +impl TestSyncStore { + pub(crate) fn new(dest_dir: PathBuf) -> Self { + let serializer = RwLock::new(()); + let mut fs_dir = dest_dir.clone(); + fs_dir.push("fs_store"); + let fs_store = FilesystemStore::new(fs_dir); + let mut sql_dir = dest_dir.clone(); + sql_dir.push("sqlite_store"); + let sqlite_store = SqliteStore::new( + sql_dir, + Some("test_sync_db".to_string()), + Some("test_sync_table".to_string()), + ) + .unwrap(); + let test_store = TestStore::new(false); + Self { serializer, fs_store, sqlite_store, test_store } + } + + fn do_list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result> { + let fs_res = self.fs_store.list(primary_namespace, secondary_namespace); + let sqlite_res = self.sqlite_store.list(primary_namespace, secondary_namespace); + let test_res = self.test_store.list(primary_namespace, secondary_namespace); + + match fs_res { + Ok(mut list) => { + list.sort(); + + let mut sqlite_list = sqlite_res.unwrap(); + sqlite_list.sort(); + assert_eq!(list, sqlite_list); + + let mut test_list = test_res.unwrap(); + test_list.sort(); + assert_eq!(list, test_list); + + Ok(list) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert!(test_res.is_err()); + Err(e) + } + } + } +} + +impl KVStore for TestSyncStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> std::io::Result> { + let _guard = self.serializer.read().unwrap(); + + let fs_res = self.fs_store.read(primary_namespace, secondary_namespace, key); + let sqlite_res = self.sqlite_store.read(primary_namespace, secondary_namespace, key); + let test_res = self.test_store.read(primary_namespace, secondary_namespace, key); + + match fs_res { + Ok(read) => { + assert_eq!(read, sqlite_res.unwrap()); + assert_eq!(read, test_res.unwrap()); + Ok(read) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert_eq!(e.kind(), unsafe { sqlite_res.unwrap_err_unchecked().kind() }); + assert!(test_res.is_err()); + assert_eq!(e.kind(), unsafe { test_res.unwrap_err_unchecked().kind() }); + Err(e) + } + } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> std::io::Result<()> { + let _guard = self.serializer.write().unwrap(); + let fs_res = self.fs_store.write(primary_namespace, secondary_namespace, key, buf); + let sqlite_res = self.sqlite_store.write(primary_namespace, secondary_namespace, key, buf); + let test_res = self.test_store.write(primary_namespace, secondary_namespace, key, buf); + + assert!(self + .do_list(primary_namespace, secondary_namespace) + .unwrap() + .contains(&key.to_string())); + + match fs_res { + Ok(()) => { + assert!(sqlite_res.is_ok()); + assert!(test_res.is_ok()); + Ok(()) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert!(test_res.is_err()); + Err(e) + } + } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> std::io::Result<()> { + let _guard = self.serializer.write().unwrap(); + let fs_res = self.fs_store.remove(primary_namespace, secondary_namespace, key, lazy); + let sqlite_res = + self.sqlite_store.remove(primary_namespace, secondary_namespace, key, lazy); + let test_res = self.test_store.remove(primary_namespace, secondary_namespace, key, lazy); + + assert!(!self + .do_list(primary_namespace, secondary_namespace) + .unwrap() + .contains(&key.to_string())); + + match fs_res { + Ok(()) => { + assert!(sqlite_res.is_ok()); + assert!(test_res.is_ok()); + Ok(()) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert!(test_res.is_err()); + Err(e) + } + } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result> { + let _guard = self.serializer.read().unwrap(); + self.do_list(primary_namespace, secondary_namespace) + } +} diff --git a/src/io/utils.rs b/src/io/utils.rs index 9bace5d5a..f722d8668 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -5,26 +5,27 @@ use crate::logger::log_error; use crate::peer_store::PeerStore; use crate::{Error, EventQueue, PaymentDetails}; -use lightning::chain::channelmonitor::ChannelMonitor; use lightning::routing::gossip::NetworkGraph; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; -use lightning::sign::{EntropySource, SignerProvider}; use lightning::util::logger::Logger; +use lightning::util::persist::{ + KVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, + NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning::util::ser::{Readable, ReadableArgs, Writeable}; +use lightning::util::string::PrintableString; use bip39::Mnemonic; -use bitcoin::hash_types::{BlockHash, Txid}; -use bitcoin::hashes::hex::FromHex; use rand::{thread_rng, RngCore}; use std::fs; -use std::io::Write; +use std::io::{Cursor, Write}; use std::ops::Deref; use std::path::Path; use std::sync::Arc; -use super::KVStore; - /// Generates a random [BIP 39] mnemonic. /// /// The result may be used to initialize the [`Node`] entropy, i.e., can be given to @@ -90,51 +91,6 @@ where } } -/// Read previously persisted [`ChannelMonitor`]s from the store. -pub(crate) fn read_channel_monitors( - kv_store: Arc, entropy_source: ES, signer_provider: SP, -) -> std::io::Result::Signer>)>> -where - ES::Target: EntropySource + Sized, - SP::Target: SignerProvider + Sized, -{ - let mut res = Vec::new(); - - for stored_key in kv_store.list(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE)? { - let txid = Txid::from_hex(stored_key.split_at(64).0).map_err(|_| { - std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid tx ID in stored key") - })?; - - let index: u16 = stored_key.split_at(65).1.parse().map_err(|_| { - std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid tx index in stored key") - })?; - - match <(BlockHash, ChannelMonitor<::Signer>)>::read( - &mut kv_store.read(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &stored_key)?, - (&*entropy_source, &*signer_provider), - ) { - Ok((block_hash, channel_monitor)) => { - if channel_monitor.get_funding_txo().0.txid != txid - || channel_monitor.get_funding_txo().0.index != index - { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "ChannelMonitor was stored under the wrong key", - )); - } - res.push((block_hash, channel_monitor)); - } - Err(e) => { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Failed to deserialize ChannelMonitor: {}", e), - )) - } - } - } - Ok(res) -} - /// Read a previously persisted [`NetworkGraph`] from the store. pub(crate) fn read_network_graph( kv_store: Arc, logger: L, @@ -142,8 +98,11 @@ pub(crate) fn read_network_graph( where L::Target: Logger, { - let mut reader = - kv_store.read(NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY)?; + let mut reader = Cursor::new(kv_store.read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + )?); NetworkGraph::read(&mut reader, logger.clone()).map_err(|e| { log_error!(logger, "Failed to deserialize NetworkGraph: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph") @@ -162,7 +121,11 @@ where L::Target: Logger, { let params = ProbabilisticScoringDecayParameters::default(); - let mut reader = kv_store.read(SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_KEY)?; + let mut reader = Cursor::new(kv_store.read( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + )?); let args = (params, network_graph, logger.clone()); ProbabilisticScorer::read(&mut reader, args).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); @@ -177,8 +140,11 @@ pub(crate) fn read_event_queue( where L::Target: Logger, { - let mut reader = - kv_store.read(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY)?; + let mut reader = Cursor::new(kv_store.read( + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + )?); EventQueue::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize event queue: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue") @@ -192,7 +158,11 @@ pub(crate) fn read_peer_info( where L::Target: Logger, { - let mut reader = kv_store.read(PEER_INFO_PERSISTENCE_NAMESPACE, PEER_INFO_PERSISTENCE_KEY)?; + let mut reader = Cursor::new(kv_store.read( + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + )?); PeerStore::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize peer store: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize PeerStore") @@ -208,11 +178,16 @@ where { let mut res = Vec::new(); - for stored_key in kv_store.list(PAYMENT_INFO_PERSISTENCE_NAMESPACE)? { - let payment = PaymentDetails::read( - &mut kv_store.read(PAYMENT_INFO_PERSISTENCE_NAMESPACE, &stored_key)?, - ) - .map_err(|e| { + for stored_key in kv_store.list( + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + )? { + let mut reader = Cursor::new(kv_store.read( + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &stored_key, + )?); + let payment = PaymentDetails::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -230,8 +205,11 @@ pub(crate) fn read_latest_rgs_sync_timestamp where L::Target: Logger, { - let mut reader = - kv_store.read(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY)?; + let mut reader = Cursor::new(kv_store.read( + LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_KEY, + )?); u32::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize latest RGS sync timestamp: {}", e); std::io::Error::new( @@ -249,12 +227,18 @@ where { let data = updated_timestamp.encode(); kv_store - .write(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY, &data) + .write( + LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_KEY, + &data, + ) .map_err(|e| { log_error!( logger, - "Writing data to key {}/{} failed due to: {}", - LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, + "Writing data to key {}/{}/{} failed due to: {}", + LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY, e ); @@ -268,8 +252,11 @@ pub(crate) fn read_latest_node_ann_bcast_timestamp bool { + key.len() <= KVSTORE_NAMESPACE_KEY_MAX_LEN + && key.chars().all(|c| KVSTORE_NAMESPACE_KEY_ALPHABET.contains(c)) +} + +pub(crate) fn check_namespace_key_validity( + primary_namespace: &str, secondary_namespace: &str, key: Option<&str>, operation: &str, +) -> Result<(), std::io::Error> { + if let Some(key) = key { + if key.is_empty() { + debug_assert!( + false, + "Failed to {} {}/{}/{}: key may not be empty.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + let msg = format!( + "Failed to {} {}/{}/{}: key may not be empty.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + if primary_namespace.is_empty() && !secondary_namespace.is_empty() { + debug_assert!(false, + "Failed to {} {}/{}/{}: primary namespace may not be empty if a non-empty secondary namespace is given.", + operation, + PrintableString(primary_namespace), PrintableString(secondary_namespace), PrintableString(key)); + let msg = format!( + "Failed to {} {}/{}/{}: primary namespace may not be empty if a non-empty secondary namespace is given.", operation, + PrintableString(primary_namespace), PrintableString(secondary_namespace), PrintableString(key)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + if !is_valid_kvstore_str(primary_namespace) + || !is_valid_kvstore_str(secondary_namespace) + || !is_valid_kvstore_str(key) + { + debug_assert!( + false, + "Failed to {} {}/{}/{}: primary namespace, secondary namespace, and key must be valid.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + let msg = format!( + "Failed to {} {}/{}/{}: primary namespace, secondary namespace, and key must be valid.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + } else { + if primary_namespace.is_empty() && !secondary_namespace.is_empty() { + debug_assert!(false, + "Failed to {} {}/{}: primary namespace may not be empty if a non-empty secondary namespace is given.", + operation, PrintableString(primary_namespace), PrintableString(secondary_namespace)); + let msg = format!( + "Failed to {} {}/{}: primary namespace may not be empty if a non-empty secondary namespace is given.", + operation, PrintableString(primary_namespace), PrintableString(secondary_namespace)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + if !is_valid_kvstore_str(primary_namespace) || !is_valid_kvstore_str(secondary_namespace) { + debug_assert!( + false, + "Failed to {} {}/{}: primary namespace and secondary namespace must be valid.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace) + ); + let msg = format!( + "Failed to {} {}/{}: primary namespace and secondary namespace must be valid.", + operation, + PrintableString(primary_namespace), + PrintableString(secondary_namespace) + ); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index 0295650b4..52b93ddfb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ //! [`send_payment`], etc.: //! //! ```no_run -//! use ldk_node::{Builder, NetAddress}; +//! use ldk_node::{Builder, SocketAddress}; //! use ldk_node::lightning_invoice::Bolt11Invoice; //! use ldk_node::bitcoin::secp256k1::PublicKey; //! use ldk_node::bitcoin::Network; @@ -47,7 +47,7 @@ //! // .. fund address .. //! //! let node_id = PublicKey::from_str("NODE_ID").unwrap(); -//! let node_addr = NetAddress::from_str("IP_ADDR:PORT").unwrap(); +//! let node_addr = SocketAddress::from_str("IP_ADDR:PORT").unwrap(); //! node.connect_open_channel(node_id, node_addr, 10000, None, None, false).unwrap(); //! //! let event = node.wait_next_event(); @@ -100,7 +100,7 @@ use error::Error; pub use event::Event; pub use types::ChannelConfig; -pub use types::NetAddress; +pub use types::SocketAddress; pub use io::utils::generate_entropy_mnemonic; @@ -115,21 +115,22 @@ pub use builder::NodeBuilder as Builder; use event::{EventHandler, EventQueue}; use gossip::GossipSource; -use io::KVStore; use payment_store::PaymentStore; pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus}; use peer_store::{PeerInfo, PeerStore}; use types::{ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router, Scorer}; -pub use types::{ChannelDetails, ChannelId, PeerDetails, UserChannelId}; +pub use types::{ChannelDetails, PeerDetails, UserChannelId}; use wallet::Wallet; -use logger::{log_debug, log_error, log_info, log_trace, FilesystemLogger, Logger}; +use logger::{log_error, log_info, log_trace, FilesystemLogger, Logger}; use lightning::chain::Confirm; use lightning::ln::channelmanager::{self, PaymentId, RecipientOnionFields, Retry}; -use lightning::ln::{PaymentHash, PaymentPreimage}; +use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage}; use lightning::sign::EntropySource; +use lightning::util::persist::KVStore; + use lightning::util::config::{ChannelHandshakeConfig, UserConfig}; pub use lightning::util::logger::Level as LogLevel; @@ -137,7 +138,7 @@ use lightning_background_processor::process_events_async; use lightning_transaction_sync::EsploraSyncClient; -use lightning::routing::router::{PaymentParameters, RouteParameters, Router as LdkRouter}; +use lightning::routing::router::{PaymentParameters, RouteParameters}; use lightning_invoice::{payment, Bolt11Invoice, Currency}; use bitcoin::hashes::sha256::Hash as Sha256; @@ -149,7 +150,6 @@ use bitcoin::{Address, Txid}; use rand::Rng; -use std::collections::HashMap; use std::default::Default; use std::net::ToSocketAddrs; use std::sync::{Arc, Mutex, RwLock}; @@ -225,7 +225,7 @@ pub struct Config { /// The used Bitcoin network. pub network: Network, /// The IP address and TCP port the node will listen on. - pub listening_address: Option, + pub listening_address: Option, /// The default CLTV expiry delta to be used for payments. pub default_cltv_expiry_delta: u32, /// The time in-between background sync attempts of the onchain wallet, in seconds. @@ -294,7 +294,7 @@ pub struct Node { gossip_source: Arc, kv_store: Arc, logger: Arc, - router: Arc, + _router: Arc, scorer: Arc>, peer_store: Arc>>, payment_store: Arc>>, @@ -504,7 +504,7 @@ impl Node { "Unable to resolve listing address: {:?}", listening_address ); - Error::InvalidNetAddress + Error::InvalidSocketAddress })? .next() .ok_or_else(|| { @@ -513,7 +513,7 @@ impl Node { "Unable to resolve listing address: {:?}", listening_address ); - Error::InvalidNetAddress + Error::InvalidSocketAddress })?; runtime.spawn(async move { @@ -781,7 +781,7 @@ impl Node { } /// Returns our own listening address. - pub fn listening_address(&self) -> Option { + pub fn listening_address(&self) -> Option { self.config.listening_address.clone() } @@ -838,7 +838,7 @@ impl Node { /// /// If `persist` is set to `true`, we'll remember the peer and reconnect to it on restart. pub fn connect( - &self, node_id: PublicKey, address: NetAddress, persist: bool, + &self, node_id: PublicKey, address: SocketAddress, persist: bool, ) -> Result<(), Error> { let rt_lock = self.runtime.read().unwrap(); if rt_lock.is_none() { @@ -903,7 +903,7 @@ impl Node { /// /// Returns a temporary channel id. pub fn connect_open_channel( - &self, node_id: PublicKey, address: NetAddress, channel_amount_sats: u64, + &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, push_to_counterparty_msat: Option, channel_config: Option>, announce_channel: bool, ) -> Result<(), Error> { @@ -1036,7 +1036,7 @@ impl Node { &self, channel_id: &ChannelId, counterparty_node_id: PublicKey, ) -> Result<(), Error> { self.peer_store.remove_peer(&counterparty_node_id)?; - match self.channel_manager.close_channel(&channel_id.0, &counterparty_node_id) { + match self.channel_manager.close_channel(&channel_id, &counterparty_node_id) { Ok(_) => Ok(()), Err(_) => Err(Error::ChannelClosingFailed), } @@ -1050,7 +1050,7 @@ impl Node { self.channel_manager .update_channel_config( &counterparty_node_id, - &[channel_id.0], + &[*channel_id], &(*channel_config).clone().into(), ) .map_err(|_| Error::ChannelConfigUpdateFailed) @@ -1160,7 +1160,7 @@ impl Node { } let payment_id = PaymentId(invoice.payment_hash().into_inner()); - let payment_secret = Some(*invoice.payment_secret()); + let payment_secret = invoice.payment_secret(); let expiry_time = invoice.duration_since_epoch().saturating_add(invoice.expiry_time()); let mut payment_params = PaymentParameters::from_node_id( invoice.recover_payee_pub_key(), @@ -1174,10 +1174,11 @@ impl Node { .with_bolt11_features(features.clone()) .map_err(|_| Error::InvalidInvoice)?; } - let route_params = RouteParameters { payment_params, final_value_msat: amount_msat }; + let route_params = + RouteParameters::from_payment_params_and_value(payment_params, amount_msat); let retry_strategy = Retry::Timeout(LDK_PAYMENT_RETRY_TIMEOUT); - let recipient_fields = RecipientOnionFields { payment_secret, payment_metadata: None }; + let recipient_fields = RecipientOnionFields::secret_only(*payment_secret); match self .channel_manager @@ -1196,7 +1197,7 @@ impl Node { let payment = PaymentDetails { hash: payment_hash, preimage: None, - secret: payment_secret, + secret: Some(*payment_secret), amount_msat: Some(amount_msat), direction: PaymentDirection::Outbound, status: PaymentStatus::Pending, @@ -1220,7 +1221,7 @@ impl Node { let payment = PaymentDetails { hash: payment_hash, preimage: None, - secret: payment_secret, + secret: Some(*payment_secret), amount_msat: Some(amount_msat), direction: PaymentDirection::Outbound, status: PaymentStatus::Failed, @@ -1255,13 +1256,10 @@ impl Node { } } - let route_params = RouteParameters { - payment_params: PaymentParameters::from_node_id( - node_id, - self.config.default_cltv_expiry_delta, - ), - final_value_msat: amount_msat, - }; + let route_params = RouteParameters::from_payment_params_and_value( + PaymentParameters::from_node_id(node_id, self.config.default_cltv_expiry_delta), + amount_msat, + ); let recipient_fields = RecipientOnionFields::spontaneous_empty(); match self.channel_manager.send_spontaneous_payment_with_retry( @@ -1324,52 +1322,32 @@ impl Node { /// payment. To mitigate this issue, channels with available liquidity less than the required /// amount times [`Config::probing_liquidity_limit_multiplier`] won't be used to send /// pre-flight probes. - pub fn send_payment_probe(&self, invoice: &Bolt11Invoice) -> Result<(), Error> { + pub fn send_payment_probes(&self, invoice: &Bolt11Invoice) -> Result<(), Error> { let rt_lock = self.runtime.read().unwrap(); if rt_lock.is_none() { return Err(Error::NotRunning); } - let amount_msat = if let Some(invoice_amount_msat) = invoice.amount_milli_satoshis() { - invoice_amount_msat - } else { - log_error!(self.logger, "Failed to send probe as no amount was given in the invoice."); - return Err(Error::InvalidAmount); - }; + let liquidity_limit_multiplier = Some(self.config.probing_liquidity_limit_multiplier); - let expiry_time = invoice.duration_since_epoch().saturating_add(invoice.expiry_time()); - let mut payment_params = PaymentParameters::from_node_id( - invoice.recover_payee_pub_key(), - invoice.min_final_cltv_expiry_delta() as u32, + payment::preflight_probe_invoice( + invoice, + &*self.channel_manager, + liquidity_limit_multiplier, ) - .with_expiry_time(expiry_time.as_secs()) - .with_route_hints(invoice.route_hints()) - .map_err(|_| Error::InvalidInvoice)?; - if let Some(features) = invoice.features() { - payment_params = payment_params - .with_bolt11_features(features.clone()) - .map_err(|_| Error::InvalidInvoice)?; - } - let route_params = RouteParameters { payment_params, final_value_msat: amount_msat }; + .map_err(|e| { + log_error!(self.logger, "Failed to send payment probes: {:?}", e); + Error::ProbeSendingFailed + })?; - self.send_payment_probe_internal(route_params) + Ok(()) } /// Sends payment probes over all paths of a route that would be used to pay the given /// amount to the given `node_id`. /// - /// This may be used to send "pre-flight" probes, i.e., to train our scorer before conducting - /// the actual payment. Note this is only useful if there likely is sufficient time for the - /// probe to settle before sending out the actual payment, e.g., when waiting for user - /// confirmation in a wallet UI. - /// - /// Otherwise, there is a chance the probe could take up some liquidity needed to complete the - /// actual payment. Users should therefore be cautious and might avoid sending probes if - /// liquidity is scarce and/or they don't expect the probe to return before they send the - /// payment. To mitigate this issue, channels with available liquidity less than the required - /// amount times [`Config::probing_liquidity_limit_multiplier`] won't be used to send - /// pre-flight probes. - pub fn send_spontaneous_payment_probe( + /// See [`Self::send_payment_probes`] for more information. + pub fn send_spontaneous_payment_probes( &self, amount_msat: u64, node_id: PublicKey, ) -> Result<(), Error> { let rt_lock = self.runtime.read().unwrap(); @@ -1377,63 +1355,60 @@ impl Node { return Err(Error::NotRunning); } - let payment_params = - PaymentParameters::from_node_id(node_id, self.config.default_cltv_expiry_delta); + let liquidity_limit_multiplier = Some(self.config.probing_liquidity_limit_multiplier); + let cltv_expiry_delta = self.config.default_cltv_expiry_delta; - let route_params = RouteParameters { payment_params, final_value_msat: amount_msat }; - - self.send_payment_probe_internal(route_params) - } - - fn send_payment_probe_internal(&self, route_params: RouteParameters) -> Result<(), Error> { - let payer = self.channel_manager.get_our_node_id(); - let usable_channels = self.channel_manager.list_usable_channels(); - let first_hops = usable_channels.iter().collect::>(); - let inflight_htlcs = self.channel_manager.compute_inflight_htlcs(); - - let route = self - .router - .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs) + self.channel_manager + .send_spontaneous_preflight_probes( + node_id, + amount_msat, + cltv_expiry_delta, + liquidity_limit_multiplier, + ) .map_err(|e| { - log_error!(self.logger, "Failed to find path for payment probe: {:?}", e); + log_error!(self.logger, "Failed to send payment probes: {:?}", e); Error::ProbeSendingFailed })?; - let mut used_liquidity_map = HashMap::with_capacity(first_hops.len()); - for path in route.paths { - if path.hops.len() + path.blinded_tail.as_ref().map_or(0, |t| t.hops.len()) < 2 { - log_debug!( + Ok(()) + } + + /// Sends payment probes over all paths of a route that would be used to pay the given + /// zero-value invoice using the given amount. + /// + /// This can be used to send pre-flight probes for a so-called "zero-amount" invoice, i.e., an + /// invoice that leaves the amount paid to be determined by the user. + /// + /// See [`Self::send_payment_probes`] for more information. + pub fn send_payment_probes_using_amount( + &self, invoice: &Bolt11Invoice, amount_msat: u64, + ) -> Result<(), Error> { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { + return Err(Error::NotRunning); + } + + if let Some(invoice_amount_msat) = invoice.amount_milli_satoshis() { + if amount_msat < invoice_amount_msat { + log_error!( self.logger, - "Skipped sending payment probe over path with less than two hops." - ); - continue; + "Failed to send probes as the given amount needs to be at least the invoice amount: required {}msat, gave {}msat.", invoice_amount_msat, amount_msat); + return Err(Error::InvalidAmount); } + } - if let Some(first_path_hop) = path.hops.first() { - if let Some(first_hop) = first_hops.iter().find(|h| { - h.get_outbound_payment_scid() == Some(first_path_hop.short_channel_id) - }) { - let path_value = path.final_value_msat() + path.fee_msat(); - let used_liquidity = - used_liquidity_map.entry(first_path_hop.short_channel_id).or_insert(0); - - if first_hop.next_outbound_htlc_limit_msat - < (*used_liquidity + path_value) - * self.config.probing_liquidity_limit_multiplier - { - log_debug!(self.logger, "Skipped sending payment probe to avoid putting channel {} under the liquidity limit.", first_path_hop.short_channel_id); - continue; - } else { - *used_liquidity += path_value; - } - } - } + let liquidity_limit_multiplier = Some(self.config.probing_liquidity_limit_multiplier); - self.channel_manager.send_probe(path).map_err(|e| { - log_error!(self.logger, "Failed to send payment probe: {:?}", e); - Error::ProbeSendingFailed - })?; - } + payment::preflight_probe_zero_value_invoice( + invoice, + amount_msat, + &*self.channel_manager, + liquidity_limit_multiplier, + ) + .map_err(|e| { + log_error!(self.logger, "Failed to send payment probes: {:?}", e); + Error::ProbeSendingFailed + })?; Ok(()) } @@ -1541,7 +1516,7 @@ impl Node { let stored_peer = self.peer_store.get_peer(&node_id); let stored_addr_opt = stored_peer.as_ref().map(|p| p.address.clone()); let address = match (con_addr_opt, stored_addr_opt) { - (Some(con_addr), _) => NetAddress(con_addr), + (Some(con_addr), _) => SocketAddress(con_addr), (None, Some(stored_addr)) => stored_addr, (None, None) => continue, }; @@ -1595,7 +1570,7 @@ impl Drop for Node { } async fn connect_peer_if_necessary( - node_id: PublicKey, addr: NetAddress, peer_manager: Arc>, + node_id: PublicKey, addr: SocketAddress, peer_manager: Arc>, logger: Arc, ) -> Result<(), Error> { for (pman_node_id, _pman_addr) in peer_manager.get_peer_node_ids() { @@ -1608,7 +1583,7 @@ async fn connect_peer_if_necessary( } async fn do_connect_peer( - node_id: PublicKey, addr: NetAddress, peer_manager: Arc>, + node_id: PublicKey, addr: SocketAddress, peer_manager: Arc>, logger: Arc, ) -> Result<(), Error> { log_info!(logger, "Connecting to peer: {}@{}", node_id, addr); @@ -1617,7 +1592,7 @@ async fn do_connect_peer( .to_socket_addrs() .map_err(|e| { log_error!(logger, "Failed to resolve network address: {}", e); - Error::InvalidNetAddress + Error::InvalidSocketAddress })? .next() .ok_or(Error::ConnectionFailed)?; diff --git a/src/payment_store.rs b/src/payment_store.rs index 2c64f246b..45acc4c71 100644 --- a/src/payment_store.rs +++ b/src/payment_store.rs @@ -1,9 +1,12 @@ use crate::hex_utils; -use crate::io::{KVStore, PAYMENT_INFO_PERSISTENCE_NAMESPACE}; +use crate::io::{ + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, +}; use crate::logger::{log_error, Logger}; use crate::Error; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; +use lightning::util::persist::KVStore; use lightning::util::ser::Writeable; use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; @@ -123,16 +126,24 @@ where pub(crate) fn remove(&self, hash: &PaymentHash) -> Result<(), Error> { let store_key = hex_utils::to_string(&hash.0); - self.kv_store.remove(PAYMENT_INFO_PERSISTENCE_NAMESPACE, &store_key).map_err(|e| { - log_error!( - self.logger, - "Removing payment data for key {}/{} failed due to: {}", - PAYMENT_INFO_PERSISTENCE_NAMESPACE, - store_key, - e - ); - Error::PersistenceFailed - }) + self.kv_store + .remove( + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &store_key, + false, + ) + .map_err(|e| { + log_error!( + self.logger, + "Removing payment data for key {}/{}/{} failed due to: {}", + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + store_key, + e + ); + Error::PersistenceFailed + }) } pub(crate) fn get(&self, hash: &PaymentHash) -> Option { @@ -183,18 +194,24 @@ where fn persist_info(&self, hash: &PaymentHash, payment: &PaymentDetails) -> Result<(), Error> { let store_key = hex_utils::to_string(&hash.0); let data = payment.encode(); - self.kv_store.write(PAYMENT_INFO_PERSISTENCE_NAMESPACE, &store_key, &data).map_err( - |e| { + self.kv_store + .write( + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &store_key, + &data, + ) + .map_err(|e| { log_error!( self.logger, - "Write for key {}/{} failed due to: {}", - PAYMENT_INFO_PERSISTENCE_NAMESPACE, + "Write for key {}/{}/{} failed due to: {}", + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, store_key, e ); Error::PersistenceFailed - }, - )?; + })?; Ok(()) } } @@ -202,18 +219,28 @@ where #[cfg(test)] mod tests { use super::*; - use crate::test::utils::{TestLogger, TestStore}; + use crate::test::utils::TestLogger; + use lightning::util::test_utils::TestStore; use std::sync::Arc; #[test] fn payment_info_is_persisted() { - let store = Arc::new(TestStore::new()); + let store = Arc::new(TestStore::new(false)); let logger = Arc::new(TestLogger::new()); let payment_store = PaymentStore::new(Vec::new(), Arc::clone(&store), logger); let hash = PaymentHash([42u8; 32]); assert!(!payment_store.get(&hash).is_some()); + let store_key = hex_utils::to_string(&hash.0); + assert!(store + .read( + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &store_key + ) + .is_err()); + let payment = PaymentDetails { hash, preimage: None, @@ -223,18 +250,23 @@ mod tests { status: PaymentStatus::Pending, }; - assert!(!store.get_and_clear_did_persist()); - assert_eq!(Ok(false), payment_store.insert(payment.clone())); - assert!(store.get_and_clear_did_persist()); + assert!(payment_store.get(&hash).is_some()); + assert!(store + .read( + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &store_key + ) + .is_ok()); assert_eq!(Ok(true), payment_store.insert(payment)); - assert!(store.get_and_clear_did_persist()); + assert!(payment_store.get(&hash).is_some()); let mut update = PaymentDetailsUpdate::new(hash); update.status = Some(PaymentStatus::Succeeded); assert_eq!(Ok(true), payment_store.update(&update)); - assert!(store.get_and_clear_did_persist()); + assert!(payment_store.get(&hash).is_some()); assert_eq!(PaymentStatus::Succeeded, payment_store.get(&hash).unwrap().status); } diff --git a/src/peer_store.rs b/src/peer_store.rs index cae22d22f..648331fa1 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -1,8 +1,12 @@ -use crate::io::{KVStore, PEER_INFO_PERSISTENCE_KEY, PEER_INFO_PERSISTENCE_NAMESPACE}; +use crate::io::{ + PEER_INFO_PERSISTENCE_KEY, PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, +}; use crate::logger::{log_error, Logger}; -use crate::{Error, NetAddress}; +use crate::{Error, SocketAddress}; use lightning::impl_writeable_tlv_based; +use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use bitcoin::secp256k1::PublicKey; @@ -58,12 +62,18 @@ where fn persist_peers(&self, locked_peers: &HashMap) -> Result<(), Error> { let data = PeerStoreSerWrapper(&*locked_peers).encode(); self.kv_store - .write(PEER_INFO_PERSISTENCE_NAMESPACE, PEER_INFO_PERSISTENCE_KEY, &data) + .write( + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + &data, + ) .map_err(|e| { log_error!( self.logger, - "Write for key {}/{} failed due to: {}", - PEER_INFO_PERSISTENCE_NAMESPACE, + "Write for key {}/{}/{} failed due to: {}", + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PEER_INFO_PERSISTENCE_KEY, e ); @@ -122,7 +132,7 @@ impl Writeable for PeerStoreSerWrapper<'_> { #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct PeerInfo { pub node_id: PublicKey, - pub address: NetAddress, + pub address: SocketAddress, } impl_writeable_tlv_based!(PeerInfo, { @@ -133,13 +143,16 @@ impl_writeable_tlv_based!(PeerInfo, { #[cfg(test)] mod tests { use super::*; - use crate::test::utils::{TestLogger, TestStore}; + use crate::test::utils::TestLogger; + + use lightning::util::test_utils::TestStore; + use std::str::FromStr; use std::sync::Arc; #[test] fn peer_info_persistence() { - let store = Arc::new(TestStore::new()); + let store = Arc::new(TestStore::new(false)); let logger = Arc::new(TestLogger::new()); let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); @@ -147,14 +160,24 @@ mod tests { "0276607124ebe6a6c9338517b6f485825b27c2dcc0b9fc2aa6a4c0df91194e5993", ) .unwrap(); - let address = NetAddress::from_str("127.0.0.1:9738").unwrap(); + let address = SocketAddress::from_str("127.0.0.1:9738").unwrap(); let expected_peer_info = PeerInfo { node_id, address }; + assert!(store + .read( + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + ) + .is_err()); peer_store.add_peer(expected_peer_info.clone()).unwrap(); - assert!(store.get_and_clear_did_persist()); // Check we can read back what we persisted. let persisted_bytes = store - .get_persisted_bytes(PEER_INFO_PERSISTENCE_NAMESPACE, PEER_INFO_PERSISTENCE_KEY) + .read( + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + ) .unwrap(); let deser_peer_store = PeerStore::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap(); @@ -163,6 +186,5 @@ mod tests { assert_eq!(peers.len(), 1); assert_eq!(peers[0], expected_peer_info); assert_eq!(deser_peer_store.get_peer(&node_id), Some(expected_peer_info)); - assert!(!store.get_and_clear_did_persist()); } } diff --git a/src/test/functional_tests.rs b/src/test/functional_tests.rs index cbb847e5d..eccbf440b 100644 --- a/src/test/functional_tests.rs +++ b/src/test/functional_tests.rs @@ -1,55 +1,27 @@ use crate::builder::NodeBuilder; -use crate::io::KVStore; +use crate::io::test_utils::TestSyncStore; use crate::test::utils::*; -use crate::test::utils::{expect_event, random_config}; +use crate::test::utils::{expect_event, random_config, setup_two_nodes}; use crate::{Error, Event, Node, PaymentDirection, PaymentStatus}; use bitcoin::Amount; use electrsd::bitcoind::BitcoinD; use electrsd::ElectrsD; +use lightning::util::persist::KVStore; + +use std::sync::Arc; #[test] fn channel_full_cycle() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - println!("== Node A =="); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let config_a = random_config(); - let mut builder_a = NodeBuilder::from_config(config_a); - builder_a.set_esplora_server(esplora_url.clone()); - let node_a = builder_a.build().unwrap(); - node_a.start().unwrap(); - - println!("\n== Node B =="); - let config_b = random_config(); - let mut builder_b = NodeBuilder::from_config(config_b); - builder_b.set_esplora_server(esplora_url); - let node_b = builder_b.build().unwrap(); - node_b.start().unwrap(); - + let (node_a, node_b) = setup_two_nodes(&electrsd, false); do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false); } #[test] fn channel_full_cycle_0conf() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - println!("== Node A =="); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let config_a = random_config(); - let mut builder_a = NodeBuilder::from_config(config_a); - builder_a.set_esplora_server(esplora_url.clone()); - let node_a = builder_a.build().unwrap(); - node_a.start().unwrap(); - - println!("\n== Node B =="); - let mut config_b = random_config(); - config_b.trusted_peers_0conf.push(node_a.node_id()); - - let mut builder_b = NodeBuilder::from_config(config_b); - builder_b.set_esplora_server(esplora_url.clone()); - let node_b = builder_b.build().unwrap(); - - node_b.start().unwrap(); - + let (node_a, node_b) = setup_two_nodes(&electrsd, true); do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, true) } @@ -190,8 +162,11 @@ fn do_channel_full_cycle( node_a.send_payment_using_amount(&invoice, underpaid_amount) ); + println!("\nB overpaid receive_payment"); let invoice = node_b.receive_payment(invoice_amount_2_msat, &"asdf", 9217).unwrap(); let overpaid_amount_msat = invoice_amount_2_msat + 100; + + println!("\nA overpaid send_payment"); let payment_hash = node_a.send_payment_using_amount(&invoice, overpaid_amount_msat).unwrap(); expect_event!(node_a, PaymentSuccessful); let received_amount = match node_b.wait_next_event() { @@ -213,9 +188,11 @@ fn do_channel_full_cycle( assert_eq!(node_b.payment(&payment_hash).unwrap().amount_msat, Some(overpaid_amount_msat)); // Test "zero-amount" invoice payment + println!("\nB receive_variable_amount_payment"); let variable_amount_invoice = node_b.receive_variable_amount_payment(&"asdf", 9217).unwrap(); let determined_amount_msat = 2345_678; assert_eq!(Err(Error::InvalidInvoice), node_a.send_payment(&variable_amount_invoice)); + println!("\nA send_payment_using_amount"); let payment_hash = node_a.send_payment_using_amount(&variable_amount_invoice, determined_amount_msat).unwrap(); @@ -238,6 +215,40 @@ fn do_channel_full_cycle( assert_eq!(node_b.payment(&payment_hash).unwrap().direction, PaymentDirection::Inbound); assert_eq!(node_b.payment(&payment_hash).unwrap().amount_msat, Some(determined_amount_msat)); + // Test spontaneous/keysend payments + println!("\nA send_spontaneous_payment"); + let keysend_amount_msat = 2500_000; + let keysend_payment_hash = + node_a.send_spontaneous_payment(keysend_amount_msat, node_b.node_id()).unwrap(); + expect_event!(node_a, PaymentSuccessful); + let received_keysend_amount = match node_b.wait_next_event() { + ref e @ Event::PaymentReceived { amount_msat, .. } => { + println!("{} got event {:?}", std::stringify!(node_b), e); + node_b.event_handled(); + amount_msat + } + ref e => { + panic!("{} got unexpected event!: {:?}", std::stringify!(node_b), e); + } + }; + assert_eq!(received_keysend_amount, keysend_amount_msat); + assert_eq!(node_a.payment(&keysend_payment_hash).unwrap().status, PaymentStatus::Succeeded); + assert_eq!( + node_a.payment(&keysend_payment_hash).unwrap().direction, + PaymentDirection::Outbound + ); + assert_eq!( + node_a.payment(&keysend_payment_hash).unwrap().amount_msat, + Some(keysend_amount_msat) + ); + assert_eq!(node_b.payment(&keysend_payment_hash).unwrap().status, PaymentStatus::Succeeded); + assert_eq!(node_b.payment(&keysend_payment_hash).unwrap().direction, PaymentDirection::Inbound); + assert_eq!( + node_b.payment(&keysend_payment_hash).unwrap().amount_msat, + Some(keysend_amount_msat) + ); + + println!("\nB close_channel"); node_b.close_channel(&channel_id, node_a.node_id()).unwrap(); expect_event!(node_a, ChannelClosed); expect_event!(node_b, ChannelClosed); @@ -248,8 +259,12 @@ fn do_channel_full_cycle( node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); - let sum_of_all_payments_sat = - (push_msat + invoice_amount_1_msat + overpaid_amount_msat + determined_amount_msat) / 1000; + let sum_of_all_payments_sat = (push_msat + + invoice_amount_1_msat + + overpaid_amount_msat + + determined_amount_msat + + keysend_amount_msat) + / 1000; let node_a_upper_bound_sat = (premine_amount_sat - funding_amount_sat) + (funding_amount_sat - sum_of_all_payments_sat); let node_a_lower_bound_sat = node_a_upper_bound_sat - onchain_fee_buffer_sat; @@ -271,21 +286,9 @@ fn do_channel_full_cycle( #[test] fn channel_open_fails_when_funds_insufficient() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - println!("== Node A =="); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let config_a = random_config(); - let mut builder_a = NodeBuilder::from_config(config_a); - builder_a.set_esplora_server(esplora_url.clone()); - let node_a = builder_a.build().unwrap(); - node_a.start().unwrap(); - let addr_a = node_a.new_onchain_address().unwrap(); + let (node_a, node_b) = setup_two_nodes(&electrsd, false); - println!("\n== Node B =="); - let config_b = random_config(); - let mut builder_b = NodeBuilder::from_config(config_b); - builder_b.set_esplora_server(esplora_url); - let node_b = builder_b.build().unwrap(); - node_b.start().unwrap(); + let addr_a = node_a.new_onchain_address().unwrap(); let addr_b = node_b.new_onchain_address().unwrap(); let premine_amount_sat = 100_000; @@ -329,68 +332,20 @@ fn connect_to_public_testnet_esplora() { #[test] fn start_stop_reinit() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); let config = random_config(); - let mut builder = NodeBuilder::from_config(config.clone()); - builder.set_esplora_server(esplora_url.clone()); - let node = builder.build().unwrap(); - let expected_node_id = node.node_id(); - - let funding_address = node.new_onchain_address().unwrap(); - let expected_amount = Amount::from_sat(100000); - premine_and_distribute_funds(&bitcoind, &electrsd, vec![funding_address], expected_amount); - assert_eq!(node.total_onchain_balance_sats().unwrap(), 0); - - node.start().unwrap(); - assert_eq!(node.start(), Err(Error::AlreadyRunning)); - - node.sync_wallets().unwrap(); - assert_eq!(node.spendable_onchain_balance_sats().unwrap(), expected_amount.to_sat()); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let log_file_symlink = format!("{}/logs/ldk_node_latest.log", config.storage_dir_path); - assert!(std::path::Path::new(&log_file_symlink).is_symlink()); + let test_sync_store = Arc::new(TestSyncStore::new(config.storage_dir_path.clone().into())); - node.stop().unwrap(); - assert_eq!(node.stop(), Err(Error::NotRunning)); + let mut builder = NodeBuilder::from_config(config.clone()); + builder.set_esplora_server(esplora_url.clone()); + let node = builder.build_with_store(Arc::clone(&test_sync_store)).unwrap(); node.start().unwrap(); - assert_eq!(node.start(), Err(Error::AlreadyRunning)); - - node.stop().unwrap(); - assert_eq!(node.stop(), Err(Error::NotRunning)); - drop(node); - - let mut new_builder = NodeBuilder::from_config(config); - new_builder.set_esplora_server(esplora_url); - let reinitialized_node = builder.build().unwrap(); - assert_eq!(reinitialized_node.node_id(), expected_node_id); - - reinitialized_node.start().unwrap(); - - assert_eq!( - reinitialized_node.spendable_onchain_balance_sats().unwrap(), - expected_amount.to_sat() - ); - - reinitialized_node.sync_wallets().unwrap(); - assert_eq!( - reinitialized_node.spendable_onchain_balance_sats().unwrap(), - expected_amount.to_sat() - ); - - reinitialized_node.stop().unwrap(); -} -#[test] -fn start_stop_reinit_fs_store() { - let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let config = random_config(); - let mut builder = NodeBuilder::from_config(config.clone()); - builder.set_esplora_server(esplora_url.clone()); - let node = builder.build_with_fs_store().unwrap(); let expected_node_id = node.node_id(); + assert_eq!(node.start(), Err(Error::AlreadyRunning)); let funding_address = node.new_onchain_address().unwrap(); let expected_amount = Amount::from_sat(100000); @@ -398,12 +353,12 @@ fn start_stop_reinit_fs_store() { premine_and_distribute_funds(&bitcoind, &electrsd, vec![funding_address], expected_amount); assert_eq!(node.total_onchain_balance_sats().unwrap(), 0); - node.start().unwrap(); - assert_eq!(node.start(), Err(Error::AlreadyRunning)); - node.sync_wallets().unwrap(); assert_eq!(node.spendable_onchain_balance_sats().unwrap(), expected_amount.to_sat()); + let log_file_symlink = format!("{}/logs/ldk_node_latest.log", config.clone().storage_dir_path); + assert!(std::path::Path::new(&log_file_symlink).is_symlink()); + node.stop().unwrap(); assert_eq!(node.stop(), Err(Error::NotRunning)); @@ -414,12 +369,12 @@ fn start_stop_reinit_fs_store() { assert_eq!(node.stop(), Err(Error::NotRunning)); drop(node); - let mut new_builder = NodeBuilder::from_config(config); - new_builder.set_esplora_server(esplora_url); - let reinitialized_node = builder.build_with_fs_store().unwrap(); - assert_eq!(reinitialized_node.node_id(), expected_node_id); + let mut builder = NodeBuilder::from_config(config.clone()); + builder.set_esplora_server(esplora_url.clone()); + let reinitialized_node = builder.build_with_store(Arc::clone(&test_sync_store)).unwrap(); reinitialized_node.start().unwrap(); + assert_eq!(reinitialized_node.node_id(), expected_node_id); assert_eq!( reinitialized_node.spendable_onchain_balance_sats().unwrap(), @@ -438,20 +393,9 @@ fn start_stop_reinit_fs_store() { #[test] fn onchain_spend_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let (node_a, node_b) = setup_two_nodes(&electrsd, false); - let config_a = random_config(); - let mut builder_a = NodeBuilder::from_config(config_a); - builder_a.set_esplora_server(esplora_url.clone()); - let node_a = builder_a.build().unwrap(); - node_a.start().unwrap(); let addr_a = node_a.new_onchain_address().unwrap(); - - let config_b = random_config(); - let mut builder_b = NodeBuilder::from_config(config_b); - builder_b.set_esplora_server(esplora_url); - let node_b = builder_b.build().unwrap(); - node_b.start().unwrap(); let addr_b = node_b.new_onchain_address().unwrap(); premine_and_distribute_funds( @@ -494,13 +438,8 @@ fn onchain_spend_receive() { #[test] fn sign_verify_msg() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); let config = random_config(); - let mut builder = NodeBuilder::from_config(config.clone()); - builder.set_esplora_server(esplora_url.clone()); - let node = builder.build().unwrap(); - - node.start().unwrap(); + let node = setup_node(&electrsd, config); // Tests arbitrary message signing and later verification let msg = "OK computer".as_bytes(); diff --git a/src/test/utils.rs b/src/test/utils.rs index 31c73d288..a5090c431 100644 --- a/src/test/utils.rs +++ b/src/test/utils.rs @@ -1,8 +1,7 @@ -use crate::io::KVStore; -use crate::Config; +use crate::builder::NodeBuilder; +use crate::io::test_utils::TestSyncStore; +use crate::{Config, Node}; use lightning::util::logger::{Level, Logger, Record}; -use lightning::util::persist::KVStorePersister; -use lightning::util::ser::Writeable; use bitcoin::{Address, Amount, Network, OutPoint, Txid}; @@ -15,14 +14,11 @@ use regex; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -use std::collections::hash_map; use std::collections::HashMap; use std::env; -use std::io::{Cursor, Read, Write}; use std::path::PathBuf; -use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; macro_rules! expect_event { @@ -41,122 +37,6 @@ macro_rules! expect_event { pub(crate) use expect_event; -pub(crate) struct TestStore { - persisted_bytes: RwLock>>>>>, - did_persist: Arc, -} - -impl TestStore { - pub fn new() -> Self { - let persisted_bytes = RwLock::new(HashMap::new()); - let did_persist = Arc::new(AtomicBool::new(false)); - Self { persisted_bytes, did_persist } - } - - pub fn get_persisted_bytes(&self, namespace: &str, key: &str) -> Option> { - if let Some(outer_ref) = self.persisted_bytes.read().unwrap().get(namespace) { - if let Some(inner_ref) = outer_ref.get(key) { - let locked = inner_ref.read().unwrap(); - return Some((*locked).clone()); - } - } - None - } - - pub fn get_and_clear_did_persist(&self) -> bool { - self.did_persist.swap(false, Ordering::Relaxed) - } -} - -impl KVStore for TestStore { - type Reader = TestReader; - - fn read(&self, namespace: &str, key: &str) -> std::io::Result { - if let Some(outer_ref) = self.persisted_bytes.read().unwrap().get(namespace) { - if let Some(inner_ref) = outer_ref.get(key) { - Ok(TestReader::new(Arc::clone(inner_ref))) - } else { - let msg = format!("Key not found: {}", key); - Err(std::io::Error::new(std::io::ErrorKind::NotFound, msg)) - } - } else { - let msg = format!("Namespace not found: {}", namespace); - Err(std::io::Error::new(std::io::ErrorKind::NotFound, msg)) - } - } - - fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { - let mut guard = self.persisted_bytes.write().unwrap(); - let outer_e = guard.entry(namespace.to_string()).or_insert(HashMap::new()); - let inner_e = outer_e.entry(key.to_string()).or_insert(Arc::new(RwLock::new(Vec::new()))); - - let mut guard = inner_e.write().unwrap(); - guard.write_all(buf)?; - self.did_persist.store(true, Ordering::SeqCst); - Ok(()) - } - - fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> { - match self.persisted_bytes.write().unwrap().entry(namespace.to_string()) { - hash_map::Entry::Occupied(mut e) => { - self.did_persist.store(true, Ordering::SeqCst); - e.get_mut().remove(&key.to_string()); - Ok(()) - } - hash_map::Entry::Vacant(_) => Ok(()), - } - } - - fn list(&self, namespace: &str) -> std::io::Result> { - match self.persisted_bytes.write().unwrap().entry(namespace.to_string()) { - hash_map::Entry::Occupied(e) => Ok(e.get().keys().cloned().collect()), - hash_map::Entry::Vacant(_) => Ok(Vec::new()), - } - } -} - -impl KVStorePersister for TestStore { - fn persist(&self, prefixed_key: &str, object: &W) -> std::io::Result<()> { - let msg = format!("Could not persist file for key {}.", prefixed_key); - let dest_file = PathBuf::from_str(prefixed_key).map_err(|_| { - lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg.clone()) - })?; - - let parent_directory = dest_file.parent().ok_or(lightning::io::Error::new( - lightning::io::ErrorKind::InvalidInput, - msg.clone(), - ))?; - let namespace = parent_directory.display().to_string(); - - let dest_without_namespace = dest_file - .strip_prefix(&namespace) - .map_err(|_| lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?; - let key = dest_without_namespace.display().to_string(); - - let data = object.encode(); - self.write(&namespace, &key, &data)?; - Ok(()) - } -} - -pub struct TestReader { - entry_ref: Arc>>, -} - -impl TestReader { - pub fn new(entry_ref: Arc>>) -> Self { - Self { entry_ref } - } -} - -impl Read for TestReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let bytes = self.entry_ref.read().unwrap().clone(); - let mut reader = Cursor::new(bytes); - reader.read(buf) - } -} - // Copied over from upstream LDK #[allow(dead_code)] pub struct TestLogger { @@ -241,10 +121,12 @@ impl Logger for TestLogger { } } -pub fn random_storage_path() -> String { +pub fn random_storage_path() -> PathBuf { + let mut temp_path = std::env::temp_dir(); let mut rng = thread_rng(); let rand_dir: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); - format!("/tmp/{}", rand_dir) + temp_path.push(rand_dir); + temp_path } pub fn random_port() -> u16 { @@ -259,8 +141,8 @@ pub fn random_config() -> Config { println!("Setting network: {}", config.network); let rand_dir = random_storage_path(); - println!("Setting random LDK storage dir: {}", rand_dir); - config.storage_dir_path = rand_dir; + println!("Setting random LDK storage dir: {}", rand_dir.display()); + config.storage_dir_path = rand_dir.to_str().unwrap().to_owned(); let rand_port = random_port(); println!("Setting random LDK listening port: {}", rand_port); @@ -292,6 +174,32 @@ pub fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { (bitcoind, electrsd) } +pub(crate) fn setup_two_nodes( + electrsd: &ElectrsD, allow_0conf: bool, +) -> (Node, Node) { + println!("== Node A =="); + let config_a = random_config(); + let node_a = setup_node(electrsd, config_a); + + println!("\n== Node B =="); + let mut config_b = random_config(); + if allow_0conf { + config_b.trusted_peers_0conf.push(node_a.node_id()); + } + let node_b = setup_node(electrsd, config_b); + (node_a, node_b) +} + +pub(crate) fn setup_node(electrsd: &ElectrsD, config: Config) -> Node { + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let mut builder = NodeBuilder::from_config(config.clone()); + builder.set_esplora_server(esplora_url.clone()); + let test_sync_store = Arc::new(TestSyncStore::new(config.storage_dir_path.into())); + let node = builder.build_with_store(test_sync_store).unwrap(); + node.start().unwrap(); + node +} + pub fn generate_blocks_and_wait(bitcoind: &BitcoinD, electrsd: &ElectrsD, num: usize) { let cur_height = bitcoind.client.get_block_count().expect("failed to get current block height"); let address = bitcoind diff --git a/src/types.rs b/src/types.rs index b031dec69..8bcd9e028 100644 --- a/src/types.rs +++ b/src/types.rs @@ -3,23 +3,23 @@ use crate::wallet::{Wallet, WalletKeysManager}; use lightning::chain::chainmonitor; use lightning::ln::channelmanager::ChannelDetails as LdkChannelDetails; -use lightning::ln::msgs::NetAddress as LdkNetAddress; use lightning::ln::msgs::RoutingMessageHandler; +use lightning::ln::msgs::SocketAddress as LdkSocketAddress; use lightning::ln::peer_handler::IgnoringMessageHandler; +use lightning::ln::ChannelId; use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; use lightning::util::config::ChannelConfig as LdkChannelConfig; use lightning::util::config::MaxDustHTLCExposure as LdkMaxDustHTLCExposure; -use lightning::util::ser::{Hostname, Readable, Writeable, Writer}; +use lightning::util::ser::{Readable, Writeable, Writer}; use lightning_net_tokio::SocketDescriptor; use lightning_transaction_sync::EsploraSyncClient; use bitcoin::secp256k1::PublicKey; use bitcoin::OutPoint; -use std::convert::TryFrom; use std::fmt::Display; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; use std::str::FromStr; @@ -107,28 +107,6 @@ impl lightning::onion_message::MessageRouter for FakeMessageRouter { } } -/// The global identifier of a channel. -/// -/// Note that this will start out to be a temporary ID until channel funding negotiation is -/// finalized, at which point it will change to be a permanent global ID tied to the on-chain -/// funding transaction. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub struct ChannelId(pub [u8; 32]); - -impl Writeable for ChannelId { - fn write(&self, writer: &mut W) -> Result<(), lightning::io::Error> { - Ok(self.0.write(writer)?) - } -} - -impl Readable for ChannelId { - fn read( - reader: &mut R, - ) -> Result { - Ok(Self(Readable::read(reader)?)) - } -} - /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. @@ -231,14 +209,14 @@ pub struct ChannelDetails { impl From for ChannelDetails { fn from(value: LdkChannelDetails) -> Self { ChannelDetails { - channel_id: ChannelId(value.channel_id), + channel_id: value.channel_id, counterparty_node_id: value.counterparty.node_id, funding_txo: value.funding_txo.and_then(|o| Some(o.into_bitcoin_outpoint())), channel_value_sats: value.channel_value_satoshis, unspendable_punishment_reserve: value.unspendable_punishment_reserve, user_channel_id: UserChannelId(value.user_channel_id), - balance_msat: value.balance_msat, feerate_sat_per_1000_weight: value.feerate_sat_per_1000_weight.unwrap(), + balance_msat: value.balance_msat, outbound_capacity_msat: value.outbound_capacity_msat, inbound_capacity_msat: value.inbound_capacity_msat, confirmations_required: value.confirmations_required, @@ -260,7 +238,7 @@ pub struct PeerDetails { /// The node ID of the peer. pub node_id: PublicKey, /// The network address of the peer. - pub address: NetAddress, + pub address: SocketAddress, /// Indicates whether we'll try to reconnect to this peer after restarts. pub is_persisted: bool, /// Indicates whether we currently have an active connection with the peer. @@ -271,26 +249,26 @@ pub struct PeerDetails { /// /// Currently only IPv4, IPv6, and DNS hostnames are supported. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct NetAddress(pub LdkNetAddress); +pub struct SocketAddress(pub LdkSocketAddress); -impl Display for NetAddress { +impl Display for SocketAddress { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.0 { - LdkNetAddress::IPv4 { addr, port } => { + LdkSocketAddress::TcpIpV4 { addr, port } => { let ip_addr = Ipv4Addr::from(addr); write!(f, "{}:{}", ip_addr, port) } - LdkNetAddress::IPv6 { addr, port } => { + LdkSocketAddress::TcpIpV6 { addr, port } => { let ip_addr = Ipv6Addr::from(addr); write!(f, "[{}]:{}", ip_addr, port) } - LdkNetAddress::Hostname { ref hostname, port } => { + LdkSocketAddress::Hostname { ref hostname, port } => { write!(f, "{}:{}", hostname.as_str(), port) } - LdkNetAddress::OnionV2(o) => { + LdkSocketAddress::OnionV2(o) => { write!(f, "OnionV2 (unsupported): {:?}", o) } - LdkNetAddress::OnionV3 { ed25519_pubkey, checksum, version, port } => write!( + LdkSocketAddress::OnionV3 { ed25519_pubkey, checksum, version, port } => write!( f, "OnionV3 (unsupported): {:?}/{:?}/{:?}/{:?}", ed25519_pubkey, checksum, version, port @@ -299,99 +277,70 @@ impl Display for NetAddress { } } -impl FromStr for NetAddress { +impl FromStr for SocketAddress { type Err = (); fn from_str(s: &str) -> Result { - match std::net::SocketAddr::from_str(s) { - Ok(addr) => { - let port: u16 = addr.port(); - match addr { - std::net::SocketAddr::V4(addr) => { - let addr = addr.ip().octets(); - return Ok(Self(LdkNetAddress::IPv4 { addr, port })); - } - std::net::SocketAddr::V6(addr) => { - let addr = addr.ip().octets(); - return Ok(Self(LdkNetAddress::IPv6 { addr, port })); - } - } - } - Err(_) => { - let trimmed_input = match s.rfind(":") { - Some(pos) => pos, - None => return Err(()), - }; - let host = &s[..trimmed_input]; - let port: u16 = match s[trimmed_input + 1..].parse() { - Ok(port) => port, - Err(_) => return Err(()), - }; - - Hostname::try_from(host.to_string()) - .map(|hostname| Self(LdkNetAddress::Hostname { hostname, port })) - .map_err(|_| ()) - } - } + Ok(Self(LdkSocketAddress::from_str(s).map_err(|_| ())?)) } } -impl From for NetAddress { +impl From for SocketAddress { fn from(value: SocketAddr) -> Self { match value { - SocketAddr::V4(v4addr) => NetAddress::from(v4addr), - SocketAddr::V6(v6addr) => NetAddress::from(v6addr), + SocketAddr::V4(v4addr) => SocketAddress::from(v4addr), + SocketAddr::V6(v6addr) => SocketAddress::from(v6addr), } } } -impl From for NetAddress { +impl From for SocketAddress { fn from(value: SocketAddrV4) -> Self { - Self(LdkNetAddress::IPv4 { addr: value.ip().octets(), port: value.port() }) + Self(LdkSocketAddress::TcpIpV4 { addr: value.ip().octets(), port: value.port() }) } } -impl From for NetAddress { +impl From for SocketAddress { fn from(value: SocketAddrV6) -> Self { - Self(LdkNetAddress::IPv6 { addr: value.ip().octets(), port: value.port() }) + Self(LdkSocketAddress::TcpIpV6 { addr: value.ip().octets(), port: value.port() }) } } -impl ToSocketAddrs for NetAddress { +impl ToSocketAddrs for SocketAddress { type Iter = std::option::IntoIter; fn to_socket_addrs(&self) -> std::io::Result { match self.0 { - LdkNetAddress::IPv4 { addr, port } => { + LdkSocketAddress::TcpIpV4 { addr, port } => { let ip_addr = Ipv4Addr::from(addr); (ip_addr, port).to_socket_addrs() } - LdkNetAddress::IPv6 { addr, port } => { + LdkSocketAddress::TcpIpV6 { addr, port } => { let ip_addr = Ipv6Addr::from(addr); (ip_addr, port).to_socket_addrs() } - LdkNetAddress::Hostname { ref hostname, port } => { + LdkSocketAddress::Hostname { ref hostname, port } => { Ok((hostname.as_str(), port).to_socket_addrs()?.next().into_iter()) } - LdkNetAddress::OnionV2(..) => { + LdkSocketAddress::OnionV2(..) => { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) } - LdkNetAddress::OnionV3 { .. } => { + LdkSocketAddress::OnionV3 { .. } => { Err(std::io::Error::from(std::io::ErrorKind::Unsupported)) } } } } -impl Writeable for NetAddress { +impl Writeable for SocketAddress { fn write(&self, writer: &mut W) -> Result<(), std::io::Error> { self.0.write(writer) } } -impl Readable for NetAddress { +impl Readable for SocketAddress { fn read(reader: &mut R) -> Result { - let addr: LdkNetAddress = Readable::read(reader)?; + let addr: LdkSocketAddress = Readable::read(reader)?; Ok(Self(addr)) } } diff --git a/src/uniffi_types.rs b/src/uniffi_types.rs index 20c8872bb..74606d83d 100644 --- a/src/uniffi_types.rs +++ b/src/uniffi_types.rs @@ -2,14 +2,14 @@ use crate::UniffiCustomTypeConverter; use crate::error::Error; use crate::hex_utils; -use crate::io::SqliteStore; -use crate::{ChannelId, NetAddress, Node, UserChannelId}; +use crate::io::sqlite_store::SqliteStore; +use crate::{Node, SocketAddress, UserChannelId}; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; use bitcoin::secp256k1::PublicKey; use bitcoin::{Address, Txid}; -use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; +use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage, PaymentSecret}; use lightning_invoice::{Bolt11Invoice, SignedRawBolt11Invoice}; use bip39::Mnemonic; @@ -176,10 +176,10 @@ impl UniffiCustomTypeConverter for Mnemonic { } } -impl UniffiCustomTypeConverter for NetAddress { +impl UniffiCustomTypeConverter for SocketAddress { type Builtin = String; fn into_custom(val: Self::Builtin) -> uniffi::Result { - Ok(NetAddress::from_str(&val).map_err(|_| Error::InvalidNetAddress)?) + Ok(SocketAddress::from_str(&val).map_err(|_| Error::InvalidSocketAddress)?) } fn from_custom(obj: Self) -> Self::Builtin { diff --git a/src/wallet.rs b/src/wallet.rs index de4fd38c9..561e92dab 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -440,6 +440,18 @@ where fn sign_gossip_message(&self, msg: UnsignedGossipMessage<'_>) -> Result { self.inner.sign_gossip_message(msg) } + + fn sign_bolt12_invoice( + &self, invoice: &lightning::offers::invoice::UnsignedBolt12Invoice, + ) -> Result { + self.inner.sign_bolt12_invoice(invoice) + } + + fn sign_bolt12_invoice_request( + &self, invoice_request: &lightning::offers::invoice_request::UnsignedInvoiceRequest, + ) -> Result { + self.inner.sign_bolt12_invoice_request(invoice_request) + } } impl EntropySource for WalletKeysManager