diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6039c54c9..27b56043b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -48,6 +48,7 @@ jobs: cargo update -p regex --precise "1.9.6" --verbose # regex 1.10.0 requires rustc 1.65.0 cargo update -p jobserver --precise "0.1.26" --verbose # jobserver 0.1.27 requires rustc 1.66.0 cargo update -p zstd-sys --precise "2.0.8+zstd.1.5.5" --verbose # zstd-sys 2.0.9+zstd.1.5.5 requires rustc 1.64.0 + cargo update -p petgraph --precise "0.6.3" --verbose # petgraph v0.6.4, requires rustc 1.64 or newer - name: Build on Rust ${{ matrix.toolchain }} run: cargo build --verbose --color always - name: Build with UniFFI support on Rust ${{ matrix.toolchain }} diff --git a/Cargo.toml b/Cargo.toml index cd21ea6eb..9a11ea556 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ tokio = { version = "1", default-features = false, features = [ "rt-multi-thread esplora-client = { version = "0.4", default-features = false } libc = "0.2" uniffi = { version = "0.23.0", features = ["build"], optional = true } +vss-client = "0.1" [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } diff --git a/src/builder.rs b/src/builder.rs index 3a6b1e6b4..533a90725 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -37,6 +37,8 @@ use lightning_persister::fs_store::FilesystemStore; use lightning_transaction_sync::EsploraSyncClient; +#[cfg(any(vss, vss_test))] +use crate::io::vss_store::VssStore; use bdk::bitcoin::secp256k1::Secp256k1; use bdk::blockchain::esplora::EsploraBlockchain; use bdk::database::SqliteDatabase; @@ -269,6 +271,16 @@ impl NodeBuilder { self.build_with_store(kv_store) } + /// Builds a [`Node`] instance with a [`VssStore`] backend and according to the options + /// previously configured. + #[cfg(any(vss, vss_test))] + pub fn build_with_vss_store( + &self, url: &str, store_id: String, + ) -> Result, BuildError> { + let vss = Arc::new(VssStore::new(url, store_id)); + self.build_with_store(vss) + } + /// Builds a [`Node`] instance according to the options previously configured. pub fn build_with_store( &self, kv_store: Arc, diff --git a/src/io/mod.rs b/src/io/mod.rs index fbed6cfbb..a7d1085c5 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -4,6 +4,8 @@ pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; pub(crate) mod utils; +#[cfg(any(vss, vss_test))] +pub(crate) mod vss_store; /// The event queue will be persisted under this key. pub(crate) const EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs new file mode 100644 index 000000000..c48e44c86 --- /dev/null +++ b/src/io/vss_store.rs @@ -0,0 +1,194 @@ +use io::Error; +use std::io; +use std::io::ErrorKind; +#[cfg(test)] +use std::panic::RefUnwindSafe; + +use crate::io::utils::check_namespace_key_validity; +use lightning::util::persist::KVStore; +use tokio::runtime::Runtime; +use vss_client::client::VssClient; +use vss_client::error::VssError; +use vss_client::types::{ + DeleteObjectRequest, GetObjectRequest, KeyValue, ListKeyVersionsRequest, PutObjectRequest, +}; + +/// A [`KVStore`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. +pub struct VssStore { + client: VssClient, + store_id: String, + runtime: Runtime, +} + +impl VssStore { + pub(crate) fn new(base_url: &str, store_id: String) -> Self { + let client = VssClient::new(base_url); + let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + Self { client, store_id, runtime } + } + + fn build_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result { + if primary_namespace.is_empty() { + Ok(key.to_string()) + } else { + Ok(format!("{}#{}#{}", primary_namespace, secondary_namespace, key)) + } + } + + fn extract_key(&self, unified_key: &str) -> io::Result { + let mut parts = unified_key.splitn(3, '#'); + let (_primary_namespace, _secondary_namespace) = (parts.next(), parts.next()); + match parts.next() { + Some(actual_key) => Ok(actual_key.to_string()), + None => Err(Error::new(ErrorKind::InvalidData, "Invalid key format")), + } + } + + async fn list_all_keys( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + let mut page_token = None; + let mut keys = vec![]; + let key_prefix = format!("{}#{}", primary_namespace, secondary_namespace); + while page_token != Some("".to_string()) { + let request = ListKeyVersionsRequest { + store_id: self.store_id.clone(), + key_prefix: Some(key_prefix.clone()), + page_token, + page_size: None, + }; + + let response = self.client.list_key_versions(&request).await.map_err(|e| { + let msg = format!( + "Failed to list keys in {}/{}: {}", + primary_namespace, secondary_namespace, e + ); + Error::new(ErrorKind::Other, msg) + })?; + + for kv in response.key_versions { + keys.push(self.extract_key(&kv.key)?); + } + page_token = response.next_page_token; + } + Ok(keys) + } +} + +impl KVStore for VssStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + let request = GetObjectRequest { + store_id: self.store_id.clone(), + key: self.build_key(primary_namespace, secondary_namespace, key)?, + }; + + let resp = + tokio::task::block_in_place(|| self.runtime.block_on(self.client.get_object(&request))) + .map_err(|e| { + let msg = format!( + "Failed to read from key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + match e { + VssError::NoSuchKeyError(..) => Error::new(ErrorKind::NotFound, msg), + _ => Error::new(ErrorKind::Other, msg), + } + })?; + Ok(resp.value.unwrap().value) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> io::Result<()> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + let request = PutObjectRequest { + store_id: self.store_id.clone(), + global_version: None, + transaction_items: vec![KeyValue { + key: self.build_key(primary_namespace, secondary_namespace, key)?, + version: -1, + value: buf.to_vec(), + }], + delete_items: vec![], + }; + + tokio::task::block_in_place(|| self.runtime.block_on(self.client.put_object(&request))) + .map_err(|e| { + let msg = format!( + "Failed to write to key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + Error::new(ErrorKind::Other, msg) + })?; + + Ok(()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; + let request = DeleteObjectRequest { + store_id: self.store_id.clone(), + key_value: Some(KeyValue { + key: self.build_key(primary_namespace, secondary_namespace, key)?, + version: -1, + value: vec![], + }), + }; + + tokio::task::block_in_place(|| self.runtime.block_on(self.client.delete_object(&request))) + .map_err(|e| { + let msg = format!( + "Failed to delete key {}/{}/{}: {}", + primary_namespace, secondary_namespace, key, e + ); + Error::new(ErrorKind::Other, msg) + })?; + Ok(()) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; + + let keys = tokio::task::block_in_place(|| { + self.runtime.block_on(self.list_all_keys(primary_namespace, secondary_namespace)) + }) + .map_err(|e| { + let msg = format!( + "Failed to retrieve keys in namespace: {}/{} : {}", + primary_namespace, secondary_namespace, e + ); + Error::new(ErrorKind::Other, msg) + })?; + + Ok(keys) + } +} + +#[cfg(test)] +impl RefUnwindSafe for VssStore {} + +#[cfg(test)] +#[cfg(vss_test)] +mod tests { + use super::*; + use crate::io::test_utils::do_read_write_remove_list_persist; + use rand::distributions::Alphanumeric; + use rand::{thread_rng, Rng}; + + #[test] + fn read_write_remove_list_persist() { + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + let mut rng = thread_rng(); + let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); + let vss_store = VssStore::new(&vss_base_url, rand_store_id); + + do_read_write_remove_list_persist(&vss_store); + } +} diff --git a/src/test/functional_tests.rs b/src/test/functional_tests.rs index e409c8813..4909aabc1 100644 --- a/src/test/functional_tests.rs +++ b/src/test/functional_tests.rs @@ -18,6 +18,29 @@ fn channel_full_cycle() { do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false); } +#[test] +#[cfg(vss_test)] +fn channel_full_cycle_with_vss_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + println!("== Node A =="); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let config_a = random_config(); + let mut builder_a = NodeBuilder::from_config(config_a); + builder_a.set_esplora_server(esplora_url.clone()); + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + let node_a = builder_a.build_with_vss_store(&vss_base_url, "node_1_store".to_string()).unwrap(); + node_a.start().unwrap(); + + println!("\n== Node B =="); + let config_b = random_config(); + let mut builder_b = NodeBuilder::from_config(config_b); + builder_b.set_esplora_server(esplora_url); + let node_b = builder_b.build_with_vss_store(&vss_base_url, "node_2_store".to_string()).unwrap(); + node_b.start().unwrap(); + + do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false); +} + #[test] fn channel_full_cycle_0conf() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();