diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 2512153df..4162e9f15 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,10 +13,10 @@ interface Builder { constructor(); [Name=from_config] constructor(Config config); - Node build(); + LDKNode build(); }; -interface Node { +interface LDKNode { [Throws=NodeError] void start(); [Throws=NodeError] diff --git a/src/event.rs b/src/event.rs index 08d8466af..7e97fc49f 100644 --- a/src/event.rs +++ b/src/event.rs @@ -7,9 +7,7 @@ use crate::payment_store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentStatus, PaymentStore, }; -use crate::io::{ - KVStore, TransactionalWrite, EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_NAMESPACE, -}; +use crate::io::{KVStore, EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_NAMESPACE}; use crate::logger::{log_error, log_info, Logger}; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; @@ -108,23 +106,21 @@ impl_writeable_tlv_based_enum!(Event, }; ); -pub struct EventQueue +pub struct EventQueue where - K::Target: KVStore, L::Target: Logger, { queue: Mutex>, notifier: Condvar, - kv_store: K, + kv_store: Arc, logger: L, } -impl EventQueue +impl EventQueue where - K::Target: KVStore, L::Target: Logger, { - pub(crate) fn new(kv_store: K, logger: L) -> Self { + pub(crate) fn new(kv_store: Arc, logger: L) -> Self { let queue: Mutex> = Mutex::new(VecDeque::new()); let notifier = Condvar::new(); Self { queue, notifier, kv_store, logger } @@ -134,7 +130,7 @@ where { let mut locked_queue = self.queue.lock().unwrap(); locked_queue.push_back(event); - self.write_queue_and_commit(&locked_queue)?; + self.persist_queue(&locked_queue)?; } self.notifier.notify_one(); @@ -156,58 +152,37 @@ where { let mut locked_queue = self.queue.lock().unwrap(); locked_queue.pop_front(); - self.write_queue_and_commit(&locked_queue)?; + self.persist_queue(&locked_queue)?; } self.notifier.notify_one(); Ok(()) } - fn write_queue_and_commit(&self, locked_queue: &VecDeque) -> Result<(), Error> { - let mut writer = self - .kv_store - .write(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY) + fn persist_queue(&self, locked_queue: &VecDeque) -> Result<(), Error> { + let data = EventQueueSerWrapper(locked_queue).encode(); + self.kv_store + .write(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY, &data) .map_err(|e| { log_error!( self.logger, - "Getting writer for key {}/{} failed due to: {}", + "Write for key {}/{} failed due to: {}", EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY, e ); Error::PersistenceFailed })?; - EventQueueSerWrapper(locked_queue).write(&mut writer).map_err(|e| { - log_error!( - self.logger, - "Writing event queue data to key {}/{} failed due to: {}", - EVENT_QUEUE_PERSISTENCE_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_KEY, - e - ); - Error::PersistenceFailed - })?; - writer.commit().map_err(|e| { - log_error!( - self.logger, - "Committing event queue data to key {}/{} failed due to: {}", - EVENT_QUEUE_PERSISTENCE_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_KEY, - e - ); - Error::PersistenceFailed - })?; Ok(()) } } -impl ReadableArgs<(K, L)> for EventQueue +impl ReadableArgs<(Arc, L)> for EventQueue where - K::Target: KVStore, L::Target: Logger, { #[inline] fn read( - reader: &mut R, args: (K, L), + reader: &mut R, args: (Arc, L), ) -> Result { let (kv_store, logger) = args; let read_queue: EventQueueDeserWrapper = Readable::read(reader)?; @@ -244,14 +219,13 @@ impl Writeable for EventQueueSerWrapper<'_> { } } -pub(crate) struct EventHandler +pub(crate) struct EventHandler where - K::Target: KVStore, L::Target: Logger, { wallet: Arc>, event_queue: Arc>, - channel_manager: Arc, + channel_manager: Arc>, network_graph: Arc, keys_manager: Arc, payment_store: Arc>, @@ -260,14 +234,13 @@ where _config: Arc, } -impl EventHandler +impl EventHandler where - K::Target: KVStore, L::Target: Logger, { pub fn new( wallet: Arc>, event_queue: Arc>, - channel_manager: Arc, network_graph: Arc, + channel_manager: Arc>, network_graph: Arc, keys_manager: Arc, payment_store: Arc>, runtime: Arc>>, logger: L, _config: Arc, ) -> Self { diff --git a/src/io/fs_store.rs b/src/io/fs_store.rs index 0925ae301..f99283659 100644 --- a/src/io/fs_store.rs +++ b/src/io/fs_store.rs @@ -1,11 +1,11 @@ #[cfg(target_os = "windows")] extern crate winapi; -use super::{KVStore, TransactionalWrite}; +use super::KVStore; use std::collections::HashMap; use std::fs; -use std::io::{BufReader, BufWriter, Read, Write}; +use std::io::{BufReader, Read, Write}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; @@ -38,13 +38,14 @@ fn path_to_windows_str>(path: T) -> Vec>>>, } impl FilesystemStore { - pub fn new(dest_dir: PathBuf) -> Self { + pub(crate) fn new(dest_dir: PathBuf) -> Self { let locks = Mutex::new(HashMap::new()); Self { dest_dir, locks } } @@ -52,28 +53,87 @@ impl FilesystemStore { impl KVStore for FilesystemStore { type Reader = FilesystemReader; - type Writer = FilesystemWriter; fn read(&self, namespace: &str, key: &str) -> std::io::Result { let mut outer_lock = self.locks.lock().unwrap(); let lock_key = (namespace.to_string(), key.to_string()); let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default()); - let mut dest_file = self.dest_dir.clone(); - dest_file.push(namespace); - dest_file.push(key); - FilesystemReader::new(dest_file, inner_lock_ref) + let mut dest_file_path = self.dest_dir.clone(); + dest_file_path.push(namespace); + dest_file_path.push(key); + FilesystemReader::new(dest_file_path, inner_lock_ref) } - fn write(&self, namespace: &str, key: &str) -> std::io::Result { + fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { let mut outer_lock = self.locks.lock().unwrap(); let lock_key = (namespace.to_string(), key.to_string()); let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default()); + let _guard = inner_lock_ref.write().unwrap(); + + let mut dest_file_path = self.dest_dir.clone(); + dest_file_path.push(namespace); + dest_file_path.push(key); + + let parent_directory = dest_file_path + .parent() + .ok_or_else(|| { + let msg = + format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })? + .to_path_buf(); + fs::create_dir_all(parent_directory.clone())?; + + // Do a crazy dance with lots of fsync()s to be overly cautious here... + // We never want to end up in a state where we've lost the old data, or end up using the + // old data on power loss after we've returned. + // The way to atomically write a file on Unix platforms is: + // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) + let mut tmp_file_path = dest_file_path.clone(); + let mut rng = thread_rng(); + let rand_str: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); + let ext = format!("{}.tmp", rand_str); + tmp_file_path.set_extension(ext); - let mut dest_file = self.dest_dir.clone(); - dest_file.push(namespace); - dest_file.push(key); - FilesystemWriter::new(dest_file, inner_lock_ref) + let mut tmp_file = fs::File::create(&tmp_file_path)?; + tmp_file.write_all(&buf)?; + tmp_file.sync_all()?; + + #[cfg(not(target_os = "windows"))] + { + fs::rename(&tmp_file_path, &dest_file_path)?; + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory.clone())?; + unsafe { + libc::fsync(dir_file.as_raw_fd()); + } + } + + #[cfg(target_os = "windows")] + { + if dest_file_path.exists() { + unsafe { + winapi::um::winbase::ReplaceFileW( + path_to_windows_str(dest_file_path).as_ptr(), + path_to_windows_str(tmp_file_path).as_ptr(), + std::ptr::null(), + winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS, + std::ptr::null_mut() as *mut winapi::ctypes::c_void, + std::ptr::null_mut() as *mut winapi::ctypes::c_void, + ) + }; + } else { + call!(unsafe { + winapi::um::winbase::MoveFileExW( + path_to_windows_str(tmp_file_path).as_ptr(), + path_to_windows_str(dest_file_path).as_ptr(), + winapi::um::winbase::MOVEFILE_WRITE_THROUGH + | winapi::um::winbase::MOVEFILE_REPLACE_EXISTING, + ) + }); + } + } + Ok(()) } fn remove(&self, namespace: &str, key: &str) -> std::io::Result { @@ -83,21 +143,22 @@ impl KVStore for FilesystemStore { let _guard = inner_lock_ref.write().unwrap(); - let mut dest_file = self.dest_dir.clone(); - dest_file.push(namespace); - dest_file.push(key); + let mut dest_file_path = self.dest_dir.clone(); + dest_file_path.push(namespace); + dest_file_path.push(key); - if !dest_file.is_file() { + if !dest_file_path.is_file() { return Ok(false); } - fs::remove_file(&dest_file)?; + fs::remove_file(&dest_file_path)?; #[cfg(not(target_os = "windows"))] { - let msg = format!("Could not retrieve parent directory of {}.", dest_file.display()); - let parent_directory = dest_file - .parent() - .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg))?; + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = + format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; unsafe { // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes @@ -110,14 +171,14 @@ impl KVStore for FilesystemStore { } } - if dest_file.is_file() { + if dest_file_path.is_file() { return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed")); } if Arc::strong_count(&inner_lock_ref) == 2 { // It's safe to remove the lock entry if we're the only one left holding a strong // reference. Checking this is necessary to ensure we continue to distribute references to the - // same lock as long as some Writers/Readers are around. However, we still want to + // same lock as long as some Readers are around. However, we still want to // clean up the table when possible. // // Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are @@ -171,8 +232,8 @@ pub struct FilesystemReader { } impl FilesystemReader { - fn new(dest_file: PathBuf, lock_ref: Arc>) -> std::io::Result { - let f = fs::File::open(dest_file.clone())?; + fn new(dest_file_path: PathBuf, lock_ref: Arc>) -> std::io::Result { + let f = fs::File::open(dest_file_path.clone())?; let inner = BufReader::new(f); Ok(Self { inner, lock_ref }) } @@ -185,115 +246,27 @@ impl Read for FilesystemReader { } } -pub struct FilesystemWriter { - dest_file: PathBuf, - parent_directory: PathBuf, - tmp_file: PathBuf, - tmp_writer: BufWriter, - lock_ref: Arc>, -} - -impl FilesystemWriter { - fn new(dest_file: PathBuf, lock_ref: Arc>) -> std::io::Result { - let msg = format!("Could not retrieve parent directory of {}.", dest_file.display()); - let parent_directory = dest_file - .parent() - .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg))? - .to_path_buf(); - fs::create_dir_all(parent_directory.clone())?; - - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let mut tmp_file = dest_file.clone(); - let mut rng = thread_rng(); - let rand_str: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); - let ext = format!("{}.tmp", rand_str); - tmp_file.set_extension(ext); - - let tmp_writer = BufWriter::new(fs::File::create(&tmp_file)?); - - Ok(Self { dest_file, parent_directory, tmp_file, tmp_writer, lock_ref }) - } -} - -impl Write for FilesystemWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - Ok(self.tmp_writer.write(buf)?) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.tmp_writer.flush()?; - self.tmp_writer.get_ref().sync_all()?; - Ok(()) - } -} - -impl TransactionalWrite for FilesystemWriter { - fn commit(&mut self) -> std::io::Result<()> { - self.flush()?; - - let _guard = self.lock_ref.write().unwrap(); - // Fsync the parent directory on Unix. - #[cfg(not(target_os = "windows"))] - { - fs::rename(&self.tmp_file, &self.dest_file)?; - let dir_file = fs::OpenOptions::new().read(true).open(self.parent_directory.clone())?; - unsafe { - libc::fsync(dir_file.as_raw_fd()); - } - } - - #[cfg(target_os = "windows")] - { - if dest_file.exists() { - unsafe { - winapi::um::winbase::ReplaceFileW( - path_to_windows_str(dest_file).as_ptr(), - path_to_windows_str(tmp_file).as_ptr(), - std::ptr::null(), - winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS, - std::ptr::null_mut() as *mut winapi::ctypes::c_void, - std::ptr::null_mut() as *mut winapi::ctypes::c_void, - ) - }; - } else { - call!(unsafe { - winapi::um::winbase::MoveFileExW( - path_to_windows_str(tmp_file).as_ptr(), - path_to_windows_str(dest_file).as_ptr(), - winapi::um::winbase::MOVEFILE_WRITE_THROUGH - | winapi::um::winbase::MOVEFILE_REPLACE_EXISTING, - ) - }); - } - } - Ok(()) - } -} - impl KVStorePersister for FilesystemStore { fn persist(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> { - let msg = format!("Could not persist file for key {}.", prefixed_key); - let dest_file = PathBuf::from_str(prefixed_key).map_err(|_| { - lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg.clone()) + let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| { + let msg = format!("Could not persist file for key {}.", prefixed_key); + lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg) })?; - let parent_directory = dest_file.parent().ok_or(lightning::io::Error::new( - lightning::io::ErrorKind::InvalidInput, - msg.clone(), - ))?; + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!("Could not persist file for key {}.", prefixed_key); + lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg) + })?; let namespace = parent_directory.display().to_string(); - let dest_without_namespace = dest_file - .strip_prefix(&namespace) - .map_err(|_| lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?; + let dest_without_namespace = dest_file_path.strip_prefix(&namespace).map_err(|_| { + let msg = format!("Could not persist file for key {}.", prefixed_key); + lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg) + })?; let key = dest_without_namespace.display().to_string(); - let mut writer = self.write(&namespace, &key)?; - object.write(&mut writer)?; - Ok(writer.commit()?) + + self.write(&namespace, &key, &object.encode())?; + Ok(()) } } @@ -302,7 +275,7 @@ mod tests { use super::*; use crate::test::utils::random_storage_path; use lightning::util::persist::KVStorePersister; - use lightning::util::ser::{Readable, Writeable}; + use lightning::util::ser::Readable; use proptest::prelude::*; proptest! { @@ -315,9 +288,7 @@ mod tests { let key = "testkey"; // Test the basic KVStore operations. - let mut writer = fs_store.write(namespace, key).unwrap(); - data.write(&mut writer).unwrap(); - writer.commit().unwrap(); + fs_store.write(namespace, key, &data).unwrap(); let listed_keys = fs_store.list(namespace).unwrap(); assert_eq!(listed_keys.len(), 1); diff --git a/src/io/mod.rs b/src/io/mod.rs index 4f5008440..aa148059a 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,7 +1,13 @@ +//! Objects and traits for data persistence. + pub(crate) mod fs_store; pub(crate) mod utils; -use std::io::{Read, Write}; +pub use fs_store::FilesystemStore; + +use lightning::util::persist::KVStorePersister; + +use std::io::Read; // The namespacs and keys LDK uses for persisting pub(crate) const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE: &str = ""; @@ -31,8 +37,8 @@ pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = ""; pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp"; /// The last time we broadcast a node announcement will be persisted under this key. -pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE: &str = ""; -pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY: &str = "latest_node_ann_bcast_timestamp"; +pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_NAMESPACE: &str = ""; +pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY: &str = "latest_node_ann_bcast_timestamp"; /// Provides an interface that allows to store and retrieve persisted values that are associated /// with given keys. @@ -43,25 +49,20 @@ pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY: &str = "latest_node_ann_bca /// /// Keys and namespaces are required to be valid ASCII strings and the empty namespace (`""`) is /// assumed to be valid namespace. -pub trait KVStore { +pub trait KVStore: KVStorePersister { type Reader: Read; - type Writer: TransactionalWrite; /// Returns a [`Read`] for the given `namespace` and `key` from which [`Readable`]s may be /// read. /// - /// Returns an `Err` if the given `key` could not be found in the given `namespace`. + /// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given `namespace`. /// /// [`Readable`]: lightning::util::ser::Readable + /// [`ErrorKind::NotFound`]: std::io::ErrorKind::NotFound fn read(&self, namespace: &str, key: &str) -> std::io::Result; - /// Returns a [`TransactionalWrite`] for the given `key` to which [`Writeable`]s may be written. + /// Persists the given data under the given `key`. /// /// Will create the given `namespace` if not already present in the store. - /// - /// Note that [`TransactionalWrite::commit`] MUST be called to commit the written data, otherwise - /// the changes won't be persisted. - /// - /// [`Writeable`]: lightning::util::ser::Writeable - fn write(&self, namespace: &str, key: &str) -> std::io::Result; + fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()>; /// Removes any data that had previously been persisted under the given `key`. /// /// Returns `true` if the `key` was present in the given `namespace`, and `false` otherwise. @@ -71,11 +72,3 @@ pub trait KVStore { /// Will return an empty list if the `namespace` is unknown. fn list(&self, namespace: &str) -> std::io::Result>; } - -/// A [`Write`] asserting data consistency. -/// -/// Note that any changes need to be `commit`ed for them to take effect, and are lost otherwise. -pub trait TransactionalWrite: Write { - /// Persist the previously made changes. - fn commit(&mut self) -> std::io::Result<()>; -} diff --git a/src/io/utils.rs b/src/io/utils.rs index f31c7587f..08cb0bbd9 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -20,6 +20,7 @@ use std::fs; use std::io::Write; use std::ops::Deref; use std::path::Path; +use std::sync::Arc; use super::KVStore; @@ -46,11 +47,10 @@ pub(crate) fn read_or_generate_seed_file(keys_seed_path: &str) -> [u8; WALLET_KE } /// Read previously persisted [`ChannelMonitor`]s from the store. -pub(crate) fn read_channel_monitors( - kv_store: K, entropy_source: ES, signer_provider: SP, +pub(crate) fn read_channel_monitors( + kv_store: Arc, entropy_source: ES, signer_provider: SP, ) -> std::io::Result::Signer>)>> where - K::Target: KVStore, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, { @@ -92,74 +92,65 @@ where } /// Read a previously persisted [`NetworkGraph`] from the store. -pub(crate) fn read_network_graph( - kv_store: K, logger: L, +pub(crate) fn read_network_graph( + kv_store: Arc, logger: L, ) -> Result, std::io::Error> where - K::Target: KVStore, L::Target: Logger, { let mut reader = kv_store.read(NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY)?; - let graph = NetworkGraph::read(&mut reader, logger).map_err(|_| { + NetworkGraph::read(&mut reader, logger).map_err(|_| { std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph") - })?; - Ok(graph) + }) } /// Read a previously persisted [`Scorer`] from the store. -pub(crate) fn read_scorer>, L: Deref>( - kv_store: K, network_graph: G, logger: L, +pub(crate) fn read_scorer>, L: Deref>( + kv_store: Arc, network_graph: G, logger: L, ) -> Result, std::io::Error> where - K::Target: KVStore, L::Target: Logger, { let params = ProbabilisticScoringParameters::default(); let mut reader = kv_store.read(SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_KEY)?; let args = (params, network_graph, logger); - let scorer = ProbabilisticScorer::read(&mut reader, args).map_err(|_| { + ProbabilisticScorer::read(&mut reader, args).map_err(|_| { std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") - })?; - Ok(scorer) + }) } /// Read previously persisted events from the store. -pub(crate) fn read_event_queue( - kv_store: K, logger: L, +pub(crate) fn read_event_queue( + kv_store: Arc, logger: L, ) -> Result, std::io::Error> where - K::Target: KVStore, L::Target: Logger, { let mut reader = kv_store.read(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY)?; - let event_queue = EventQueue::read(&mut reader, (kv_store, logger)).map_err(|_| { + EventQueue::read(&mut reader, (kv_store, logger)).map_err(|_| { std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue") - })?; - Ok(event_queue) + }) } /// Read previously persisted peer info from the store. -pub(crate) fn read_peer_info( - kv_store: K, logger: L, +pub(crate) fn read_peer_info( + kv_store: Arc, logger: L, ) -> Result, std::io::Error> where - K::Target: KVStore, L::Target: Logger, { let mut reader = kv_store.read(PEER_INFO_PERSISTENCE_NAMESPACE, PEER_INFO_PERSISTENCE_KEY)?; - let peer_info = PeerStore::read(&mut reader, (kv_store, logger)).map_err(|_| { + PeerStore::read(&mut reader, (kv_store, logger)).map_err(|_| { std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize PeerStore") - })?; - Ok(peer_info) + }) } /// Read previously persisted payments information from the store. -pub(crate) fn read_payments(kv_store: K) -> Result, std::io::Error> -where - K::Target: KVStore, -{ +pub(crate) fn read_payments( + kv_store: Arc, +) -> Result, std::io::Error> { let mut res = Vec::new(); for stored_key in kv_store.list(PAYMENT_INFO_PERSISTENCE_NAMESPACE)? { @@ -174,10 +165,9 @@ where Ok(res) } -pub(crate) fn read_latest_rgs_sync_timestamp(kv_store: K) -> Result -where - K::Target: KVStore, -{ +pub(crate) fn read_latest_rgs_sync_timestamp( + kv_store: Arc, +) -> Result { let mut reader = kv_store.read(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY)?; u32::read(&mut reader).map_err(|_| { @@ -188,55 +178,32 @@ where }) } -pub(crate) fn write_latest_rgs_sync_timestamp( - updated_timestamp: u32, kv_store: K, logger: L, +pub(crate) fn write_latest_rgs_sync_timestamp( + updated_timestamp: u32, kv_store: Arc, logger: L, ) -> Result<(), Error> where - K::Target: KVStore, L::Target: Logger, { - let mut writer = kv_store - .write(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY) + let data = updated_timestamp.encode(); + kv_store + .write(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY, &data) .map_err(|e| { log_error!( logger, - "Getting writer for key {}/{} failed due to: {}", + "Writing data to key {}/{} failed due to: {}", LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY, e ); Error::PersistenceFailed - })?; - updated_timestamp.write(&mut writer).map_err(|e| { - log_error!( - logger, - "Writing data to key {}/{} failed due to: {}", - LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, - LATEST_RGS_SYNC_TIMESTAMP_KEY, - e - ); - Error::PersistenceFailed - })?; - writer.commit().map_err(|e| { - log_error!( - logger, - "Committing data to key {}/{} failed due to: {}", - LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, - LATEST_RGS_SYNC_TIMESTAMP_KEY, - e - ); - Error::PersistenceFailed - }) + }) } -pub(crate) fn read_latest_node_ann_bcast_timestamp( - kv_store: K, -) -> Result -where - K::Target: KVStore, -{ +pub(crate) fn read_latest_node_ann_bcast_timestamp( + kv_store: Arc, +) -> Result { let mut reader = kv_store - .read(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY)?; + .read(LATEST_NODE_ANN_BCAST_TIMESTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY)?; u64::read(&mut reader).map_err(|_| { std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -245,43 +212,27 @@ where }) } -pub(crate) fn write_latest_node_ann_bcast_timestamp( - updated_timestamp: u64, kv_store: K, logger: L, +pub(crate) fn write_latest_node_ann_bcast_timestamp( + updated_timestamp: u64, kv_store: Arc, logger: L, ) -> Result<(), Error> where - K::Target: KVStore, L::Target: Logger, { - let mut writer = kv_store - .write(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY) + let data = updated_timestamp.encode(); + kv_store + .write( + LATEST_NODE_ANN_BCAST_TIMESTAMP_NAMESPACE, + LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY, + &data, + ) .map_err(|e| { log_error!( logger, - "Getting writer for key {}/{} failed due to: {}", - LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, + "Writing data to key {}/{} failed due to: {}", + LATEST_NODE_ANN_BCAST_TIMESTAMP_NAMESPACE, + LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY, e ); Error::PersistenceFailed - })?; - updated_timestamp.write(&mut writer).map_err(|e| { - log_error!( - logger, - "Writing data to key {}/{} failed due to: {}", - LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, - e - ); - Error::PersistenceFailed - })?; - writer.commit().map_err(|e| { - log_error!( - logger, - "Committing data to key {}/{} failed due to: {}", - LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, - e - ); - Error::PersistenceFailed - }) + }) } diff --git a/src/lib.rs b/src/lib.rs index cfbaa3bb6..086701371 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,7 @@ mod error; mod event; mod gossip; mod hex_utils; -mod io; +pub mod io; mod logger; mod payment_store; mod peer_store; @@ -320,8 +320,18 @@ impl Builder { self } + /// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options + /// previously configured. + pub fn build(&self) -> Arc> { + let ldk_data_dir = format!("{}/ldk", self.config.storage_dir_path); + let kv_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); + self.build_with_store(kv_store) + } + /// Builds a [`Node`] instance according to the options previously configured. - pub fn build(&self) -> Arc { + pub fn build_with_store( + &self, kv_store: Arc, + ) -> Arc> { let config = Arc::new(self.config.clone()); let ldk_data_dir = format!("{}/ldk", config.storage_dir_path); @@ -392,10 +402,8 @@ impl Builder { Arc::clone(&logger), )); - let kv_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); - // Initialize the ChainMonitor - let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( + let chain_monitor: Arc> = Arc::new(chainmonitor::ChainMonitor::new( Some(Arc::clone(&tx_sync)), Arc::clone(&wallet), Arc::clone(&logger), @@ -492,7 +500,7 @@ impl Builder { channel_monitor_references, ); let (_hash, channel_manager) = - <(BlockHash, ChannelManager)>::read(&mut reader, read_args) + <(BlockHash, ChannelManager)>::read(&mut reader, read_args) .expect("Failed to read channel manager from store"); channel_manager } else { @@ -662,31 +670,35 @@ impl Builder { } } +/// This type alias is required as Uniffi doesn't support generics, i.e., we can only expose the +/// concretized types via this aliasing hack. +type LDKNode = Node; + /// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities. /// /// Needs to be initialized and instantiated through [`Builder::build`]. -pub struct Node { +pub struct Node { runtime: Arc>>, stop_sender: tokio::sync::watch::Sender<()>, stop_receiver: tokio::sync::watch::Receiver<()>, config: Arc, wallet: Arc>, tx_sync: Arc>>, - event_queue: Arc, Arc>>, - channel_manager: Arc, - chain_monitor: Arc, - peer_manager: Arc, + event_queue: Arc>>, + channel_manager: Arc>, + chain_monitor: Arc>, + peer_manager: Arc>, keys_manager: Arc, network_graph: Arc, gossip_source: Arc, - kv_store: Arc, + kv_store: Arc, logger: Arc, scorer: Arc>, - peer_store: Arc, Arc>>, - payment_store: Arc, Arc>>, + peer_store: Arc>>, + payment_store: Arc>>, } -impl Node { +impl Node { /// Starts the necessary background tasks, such as handling events coming from user input, /// LDK/BDK, and the peer-to-peer network. /// @@ -1682,14 +1694,14 @@ impl Node { } } -impl Drop for Node { +impl Drop for Node { fn drop(&mut self) { let _ = self.stop(); } } -async fn connect_peer_if_necessary( - node_id: PublicKey, addr: NetAddress, peer_manager: Arc, +async fn connect_peer_if_necessary( + node_id: PublicKey, addr: NetAddress, peer_manager: Arc>, logger: Arc, ) -> Result<(), Error> { for (pman_node_id, _pman_addr) in peer_manager.get_peer_node_ids() { @@ -1701,8 +1713,8 @@ async fn connect_peer_if_necessary( do_connect_peer(node_id, addr, peer_manager, logger).await } -async fn do_connect_peer( - node_id: PublicKey, addr: NetAddress, peer_manager: Arc, +async fn do_connect_peer( + node_id: PublicKey, addr: NetAddress, peer_manager: Arc>, logger: Arc, ) -> Result<(), Error> { log_info!(logger, "Connecting to peer: {}@{}", node_id, addr); diff --git a/src/payment_store.rs b/src/payment_store.rs index 80da72c0f..fb18b2efa 100644 --- a/src/payment_store.rs +++ b/src/payment_store.rs @@ -1,5 +1,5 @@ use crate::hex_utils; -use crate::io::{KVStore, TransactionalWrite, PAYMENT_INFO_PERSISTENCE_NAMESPACE}; +use crate::io::{KVStore, PAYMENT_INFO_PERSISTENCE_NAMESPACE}; use crate::logger::{log_error, Logger}; use crate::Error; @@ -10,7 +10,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use std::collections::HashMap; use std::iter::FromIterator; use std::ops::Deref; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; /// Represents a payment. #[derive(Clone, Debug, PartialEq, Eq)] @@ -92,22 +92,20 @@ impl PaymentDetailsUpdate { } } -pub(crate) struct PaymentStore +pub(crate) struct PaymentStore where - K::Target: KVStore, L::Target: Logger, { payments: Mutex>, - kv_store: K, + kv_store: Arc, logger: L, } -impl PaymentStore +impl PaymentStore where - K::Target: KVStore, L::Target: Logger, { - pub(crate) fn new(payments: Vec, kv_store: K, logger: L) -> Self { + pub(crate) fn new(payments: Vec, kv_store: Arc, logger: L) -> Self { let payments = Mutex::new(HashMap::from_iter( payments.into_iter().map(|payment| (payment.hash, payment)), )); @@ -119,7 +117,7 @@ where let hash = payment.hash.clone(); let updated = locked_payments.insert(hash.clone(), payment.clone()).is_some(); - self.write_info_and_commit(&hash, &payment)?; + self.persist_info(&hash, &payment)?; Ok(updated) } @@ -166,7 +164,7 @@ where payment.status = status; } - self.write_info_and_commit(&update.hash, payment)?; + self.persist_info(&update.hash, payment)?; updated = true; } @@ -186,41 +184,22 @@ where .collect::>() } - fn write_info_and_commit( - &self, hash: &PaymentHash, payment: &PaymentDetails, - ) -> Result<(), Error> { + fn persist_info(&self, hash: &PaymentHash, payment: &PaymentDetails) -> Result<(), Error> { let store_key = hex_utils::to_string(&hash.0); - let mut writer = - self.kv_store.write(PAYMENT_INFO_PERSISTENCE_NAMESPACE, &store_key).map_err(|e| { + let data = payment.encode(); + self.kv_store.write(PAYMENT_INFO_PERSISTENCE_NAMESPACE, &store_key, &data).map_err( + |e| { log_error!( self.logger, - "Getting writer for key {}/{} failed due to: {}", + "Write for key {}/{} failed due to: {}", PAYMENT_INFO_PERSISTENCE_NAMESPACE, store_key, e ); Error::PersistenceFailed - })?; - payment.write(&mut writer).map_err(|e| { - log_error!( - self.logger, - "Writing payment data for key {}/{} failed due to: {}", - PAYMENT_INFO_PERSISTENCE_NAMESPACE, - store_key, - e - ); - Error::PersistenceFailed - })?; - writer.commit().map_err(|e| { - log_error!( - self.logger, - "Committing payment data for key {}/{} failed due to: {}", - PAYMENT_INFO_PERSISTENCE_NAMESPACE, - store_key, - e - ); - Error::PersistenceFailed - }) + }, + )?; + Ok(()) } } diff --git a/src/peer_store.rs b/src/peer_store.rs index 90cdae63c..d22560aa1 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -1,6 +1,4 @@ -use crate::io::{ - KVStore, TransactionalWrite, PEER_INFO_PERSISTENCE_KEY, PEER_INFO_PERSISTENCE_NAMESPACE, -}; +use crate::io::{KVStore, PEER_INFO_PERSISTENCE_KEY, PEER_INFO_PERSISTENCE_NAMESPACE}; use crate::logger::{log_error, Logger}; use crate::{Error, NetAddress}; @@ -11,24 +9,22 @@ use bitcoin::secp256k1::PublicKey; use std::collections::HashMap; use std::ops::Deref; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; -pub struct PeerStore +pub struct PeerStore where - K::Target: KVStore, L::Target: Logger, { peers: RwLock>, - kv_store: K, + kv_store: Arc, logger: L, } -impl PeerStore +impl PeerStore where - K::Target: KVStore, L::Target: Logger, { - pub(crate) fn new(kv_store: K, logger: L) -> Self { + pub(crate) fn new(kv_store: Arc, logger: L) -> Self { let peers = RwLock::new(HashMap::new()); Self { peers, kv_store, logger } } @@ -37,14 +33,14 @@ where let mut locked_peers = self.peers.write().unwrap(); locked_peers.insert(peer_info.node_id, peer_info); - self.write_peers_and_commit(&*locked_peers) + self.persist_peers(&*locked_peers) } pub(crate) fn remove_peer(&self, node_id: &PublicKey) -> Result<(), Error> { let mut locked_peers = self.peers.write().unwrap(); locked_peers.remove(node_id); - self.write_peers_and_commit(&*locked_peers) + self.persist_peers(&*locked_peers) } pub(crate) fn list_peers(&self) -> Vec { @@ -55,53 +51,31 @@ where self.peers.read().unwrap().get(node_id).cloned() } - fn write_peers_and_commit( - &self, locked_peers: &HashMap, - ) -> Result<(), Error> { - let mut writer = self - .kv_store - .write(PEER_INFO_PERSISTENCE_NAMESPACE, PEER_INFO_PERSISTENCE_KEY) + fn persist_peers(&self, locked_peers: &HashMap) -> Result<(), Error> { + let data = PeerStoreSerWrapper(&*locked_peers).encode(); + self.kv_store + .write(PEER_INFO_PERSISTENCE_NAMESPACE, PEER_INFO_PERSISTENCE_KEY, &data) .map_err(|e| { log_error!( self.logger, - "Getting writer for key {}/{} failed due to: {}", + "Write for key {}/{} failed due to: {}", PEER_INFO_PERSISTENCE_NAMESPACE, PEER_INFO_PERSISTENCE_KEY, e ); Error::PersistenceFailed })?; - PeerStoreSerWrapper(&*locked_peers).write(&mut writer).map_err(|e| { - log_error!( - self.logger, - "Writing peer data to key {}/{} failed due to: {}", - PEER_INFO_PERSISTENCE_NAMESPACE, - PEER_INFO_PERSISTENCE_KEY, - e - ); - Error::PersistenceFailed - })?; - writer.commit().map_err(|e| { - log_error!( - self.logger, - "Committing peer data to key {}/{} failed due to: {}", - PEER_INFO_PERSISTENCE_NAMESPACE, - PEER_INFO_PERSISTENCE_KEY, - e - ); - Error::PersistenceFailed - }) + Ok(()) } } -impl ReadableArgs<(K, L)> for PeerStore +impl ReadableArgs<(Arc, L)> for PeerStore where - K::Target: KVStore, L::Target: Logger, { #[inline] fn read( - reader: &mut R, args: (K, L), + reader: &mut R, args: (Arc, L), ) -> Result { let (kv_store, logger) = args; let read_peers: PeerStoreDeserWrapper = Readable::read(reader)?; diff --git a/src/test/utils.rs b/src/test/utils.rs index 621bbd683..c396b21a8 100644 --- a/src/test/utils.rs +++ b/src/test/utils.rs @@ -1,4 +1,4 @@ -use crate::io::{KVStore, TransactionalWrite}; +use crate::io::KVStore; use crate::Config; use lightning::util::logger::{Level, Logger, Record}; use lightning::util::persist::KVStorePersister; @@ -18,7 +18,7 @@ use rand::{thread_rng, Rng}; use std::collections::hash_map; use std::collections::HashMap; use std::env; -use std::io::{BufWriter, Cursor, Read, Write}; +use std::io::{Cursor, Read, Write}; use std::path::PathBuf; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; @@ -70,7 +70,6 @@ impl TestStore { impl KVStore for TestStore { type Reader = TestReader; - type Writer = TestWriter; fn read(&self, namespace: &str, key: &str) -> std::io::Result { if let Some(outer_ref) = self.persisted_bytes.read().unwrap().get(namespace) { @@ -86,11 +85,15 @@ impl KVStore for TestStore { } } - fn write(&self, namespace: &str, key: &str) -> std::io::Result { + fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { let mut guard = self.persisted_bytes.write().unwrap(); let outer_e = guard.entry(namespace.to_string()).or_insert(HashMap::new()); let inner_e = outer_e.entry(key.to_string()).or_insert(Arc::new(RwLock::new(Vec::new()))); - Ok(TestWriter::new(Arc::clone(&inner_e), Arc::clone(&self.did_persist))) + + let mut guard = inner_e.write().unwrap(); + guard.write_all(buf)?; + self.did_persist.store(true, Ordering::SeqCst); + Ok(()) } fn remove(&self, namespace: &str, key: &str) -> std::io::Result { @@ -128,9 +131,9 @@ impl KVStorePersister for TestStore { .strip_prefix(&namespace) .map_err(|_| lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?; let key = dest_without_namespace.display().to_string(); - let mut writer = self.write(&namespace, &key)?; - object.write(&mut writer)?; - writer.commit()?; + + let data = object.encode(); + self.write(&namespace, &key, &data)?; Ok(()) } } @@ -153,40 +156,6 @@ impl Read for TestReader { } } -pub struct TestWriter { - tmp_inner: BufWriter>, - entry_ref: Arc>>, - did_persist: Arc, -} - -impl TestWriter { - pub fn new(entry_ref: Arc>>, did_persist: Arc) -> Self { - let tmp_inner = BufWriter::new(Vec::new()); - Self { tmp_inner, entry_ref, did_persist } - } -} - -impl Write for TestWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.tmp_inner.write(buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.tmp_inner.flush() - } -} - -impl TransactionalWrite for TestWriter { - fn commit(&mut self) -> std::io::Result<()> { - self.flush()?; - let bytes_ref = self.tmp_inner.get_ref(); - let mut guard = self.entry_ref.write().unwrap(); - guard.clone_from(bytes_ref); - self.did_persist.store(true, Ordering::SeqCst); - Ok(()) - } -} - // Copied over from upstream LDK #[allow(dead_code)] pub struct TestLogger { diff --git a/src/types.rs b/src/types.rs index f7f8aa911..92d957246 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,5 @@ use crate::error::Error; use crate::hex_utils; -use crate::io::fs_store::FilesystemStore; use crate::logger::FilesystemLogger; use crate::wallet::{Wallet, WalletKeysManager}; use crate::UniffiCustomTypeConverter; @@ -32,18 +31,18 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSoc use std::str::FromStr; use std::sync::{Arc, Mutex}; -pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< +pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc>>, Arc>, Arc>, Arc, - Arc, + Arc, >; -pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< +pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< SocketDescriptor, - Arc, + Arc>, Arc, Arc, Arc, @@ -51,8 +50,8 @@ pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< Arc>, >; -pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManager< - Arc, +pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManager< + Arc>, Arc>, Arc>, Arc>,