Skip to content

Commit fe43b78

Browse files
committed
Add VSS Store Implementation
A KVStore implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
1 parent d78cab0 commit fe43b78

File tree

6 files changed

+233
-0
lines changed

6 files changed

+233
-0
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ jobs:
4848
cargo update -p regex --precise "1.9.6" --verbose # regex 1.10.0 requires rustc 1.65.0
4949
cargo update -p jobserver --precise "0.1.26" --verbose # jobserver 0.1.27 requires rustc 1.66.0
5050
cargo update -p zstd-sys --precise "2.0.8+zstd.1.5.5" --verbose # zstd-sys 2.0.9+zstd.1.5.5 requires rustc 1.64.0
51+
cargo update -p petgraph --precise "0.6.3" --verbose # petgraph v0.6.4, requires rustc 1.64 or newer
5152
- name: Build on Rust ${{ matrix.toolchain }}
5253
run: cargo build --verbose --color always
5354
- name: Build with UniFFI support on Rust ${{ matrix.toolchain }}

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ tokio = { version = "1", default-features = false, features = [ "rt-multi-thread
6666
esplora-client = { version = "0.4", default-features = false }
6767
libc = "0.2"
6868
uniffi = { version = "0.23.0", features = ["build"], optional = true }
69+
vss-client = "0.1"
6970

7071
[target.'cfg(windows)'.dependencies]
7172
winapi = { version = "0.3", features = ["winbase"] }

src/builder.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use lightning_persister::fs_store::FilesystemStore;
3737

3838
use lightning_transaction_sync::EsploraSyncClient;
3939

40+
#[cfg(any(vss, vss_test))]
41+
use crate::io::vss_store::VssStore;
4042
use bdk::bitcoin::secp256k1::Secp256k1;
4143
use bdk::blockchain::esplora::EsploraBlockchain;
4244
use bdk::database::SqliteDatabase;
@@ -269,6 +271,16 @@ impl NodeBuilder {
269271
self.build_with_store(kv_store)
270272
}
271273

274+
/// Builds a [`Node`] instance with a [`VssStore`] backend and according to the options
275+
/// previously configured.
276+
#[cfg(any(vss, vss_test))]
277+
pub fn build_with_vss_store(
278+
&self, url: &str, store_id: String,
279+
) -> Result<Node<VssStore>, BuildError> {
280+
let vss = Arc::new(VssStore::new(url, store_id));
281+
self.build_with_store(vss)
282+
}
283+
272284
/// Builds a [`Node`] instance according to the options previously configured.
273285
pub fn build_with_store<K: KVStore + Sync + Send + 'static>(
274286
&self, kv_store: Arc<K>,

src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ pub mod sqlite_store;
44
#[cfg(test)]
55
pub(crate) mod test_utils;
66
pub(crate) mod utils;
7+
#[cfg(any(vss, vss_test))]
8+
pub(crate) mod vss_store;
79

810
/// The event queue will be persisted under this key.
911
pub(crate) const EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";

src/io/vss_store.rs

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
use io::Error;
2+
use std::io;
3+
use std::io::ErrorKind;
4+
#[cfg(test)]
5+
use std::panic::RefUnwindSafe;
6+
7+
use crate::io::utils::check_namespace_key_validity;
8+
use lightning::util::persist::KVStore;
9+
use tokio::runtime::Runtime;
10+
use vss_client::client::VssClient;
11+
use vss_client::error::VssError;
12+
use vss_client::types::{
13+
DeleteObjectRequest, GetObjectRequest, KeyValue, ListKeyVersionsRequest, PutObjectRequest,
14+
};
15+
16+
/// A [`KVStore`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
17+
pub struct VssStore {
18+
client: VssClient,
19+
store_id: String,
20+
runtime: Runtime,
21+
}
22+
23+
impl VssStore {
24+
pub(crate) fn new(base_url: &str, store_id: String) -> Self {
25+
let client = VssClient::new(base_url);
26+
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
27+
Self { client, store_id, runtime }
28+
}
29+
30+
fn build_key(
31+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
32+
) -> io::Result<String> {
33+
if primary_namespace.is_empty() {
34+
Ok(key.to_string())
35+
} else {
36+
Ok(format!("{}#{}#{}", primary_namespace, secondary_namespace, key))
37+
}
38+
}
39+
40+
fn extract_key(&self, unified_key: &str) -> io::Result<String> {
41+
let mut parts = unified_key.splitn(3, '#');
42+
let (_primary_namespace, _secondary_namespace) = (parts.next(), parts.next());
43+
match parts.next() {
44+
Some(actual_key) => Ok(actual_key.to_string()),
45+
None => Err(Error::new(ErrorKind::InvalidData, "Invalid key format")),
46+
}
47+
}
48+
49+
async fn list_all_keys(
50+
&self, primary_namespace: &str, secondary_namespace: &str,
51+
) -> io::Result<Vec<String>> {
52+
let mut page_token = None;
53+
let mut keys = vec![];
54+
let key_prefix = format!("{}#{}", primary_namespace, secondary_namespace);
55+
while page_token != Some("".to_string()) {
56+
let request = ListKeyVersionsRequest {
57+
store_id: self.store_id.clone(),
58+
key_prefix: Some(key_prefix.clone()),
59+
page_token,
60+
page_size: None,
61+
};
62+
63+
let response = self.client.list_key_versions(&request).await.map_err(|e| {
64+
let msg = format!(
65+
"Failed to list keys in {}/{}: {}",
66+
primary_namespace, secondary_namespace, e
67+
);
68+
Error::new(ErrorKind::Other, msg)
69+
})?;
70+
71+
for kv in response.key_versions {
72+
keys.push(self.extract_key(&kv.key)?);
73+
}
74+
page_token = response.next_page_token;
75+
}
76+
Ok(keys)
77+
}
78+
}
79+
80+
impl KVStore for VssStore {
81+
fn read(
82+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
83+
) -> io::Result<Vec<u8>> {
84+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
85+
let request = GetObjectRequest {
86+
store_id: self.store_id.clone(),
87+
key: self.build_key(primary_namespace, secondary_namespace, key)?,
88+
};
89+
90+
let resp =
91+
tokio::task::block_in_place(|| self.runtime.block_on(self.client.get_object(&request)))
92+
.map_err(|e| {
93+
let msg = format!(
94+
"Failed to read from key {}/{}/{}: {}",
95+
primary_namespace, secondary_namespace, key, e
96+
);
97+
match e {
98+
VssError::NoSuchKeyError(..) => Error::new(ErrorKind::NotFound, msg),
99+
_ => Error::new(ErrorKind::Other, msg),
100+
}
101+
})?;
102+
Ok(resp.value.unwrap().value)
103+
}
104+
105+
fn write(
106+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
107+
) -> io::Result<()> {
108+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
109+
let request = PutObjectRequest {
110+
store_id: self.store_id.clone(),
111+
global_version: None,
112+
transaction_items: vec![KeyValue {
113+
key: self.build_key(primary_namespace, secondary_namespace, key)?,
114+
version: -1,
115+
value: buf.to_vec(),
116+
}],
117+
delete_items: vec![],
118+
};
119+
120+
tokio::task::block_in_place(|| self.runtime.block_on(self.client.put_object(&request)))
121+
.map_err(|e| {
122+
let msg = format!(
123+
"Failed to write to key {}/{}/{}: {}",
124+
primary_namespace, secondary_namespace, key, e
125+
);
126+
Error::new(ErrorKind::Other, msg)
127+
})?;
128+
129+
Ok(())
130+
}
131+
132+
fn remove(
133+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
134+
) -> io::Result<()> {
135+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
136+
let request = DeleteObjectRequest {
137+
store_id: self.store_id.clone(),
138+
key_value: Some(KeyValue {
139+
key: self.build_key(primary_namespace, secondary_namespace, key)?,
140+
version: -1,
141+
value: vec![],
142+
}),
143+
};
144+
145+
tokio::task::block_in_place(|| self.runtime.block_on(self.client.delete_object(&request)))
146+
.map_err(|e| {
147+
let msg = format!(
148+
"Failed to delete key {}/{}/{}: {}",
149+
primary_namespace, secondary_namespace, key, e
150+
);
151+
Error::new(ErrorKind::Other, msg)
152+
})?;
153+
Ok(())
154+
}
155+
156+
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
157+
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
158+
159+
let keys = tokio::task::block_in_place(|| {
160+
self.runtime.block_on(self.list_all_keys(primary_namespace, secondary_namespace))
161+
})
162+
.map_err(|e| {
163+
let msg = format!(
164+
"Failed to retrieve keys in namespace: {}/{} : {}",
165+
primary_namespace, secondary_namespace, e
166+
);
167+
Error::new(ErrorKind::Other, msg)
168+
})?;
169+
170+
Ok(keys)
171+
}
172+
}
173+
174+
#[cfg(test)]
175+
impl RefUnwindSafe for VssStore {}
176+
177+
#[cfg(test)]
178+
#[cfg(vss_test)]
179+
mod tests {
180+
use super::*;
181+
use crate::io::test_utils::do_read_write_remove_list_persist;
182+
use rand::distributions::Alphanumeric;
183+
use rand::{thread_rng, Rng};
184+
185+
#[test]
186+
fn read_write_remove_list_persist() {
187+
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
188+
let mut rng = thread_rng();
189+
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
190+
let vss_store = VssStore::new(&vss_base_url, rand_store_id);
191+
192+
do_read_write_remove_list_persist(&vss_store);
193+
}
194+
}

src/test/functional_tests.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,29 @@ fn channel_full_cycle() {
1818
do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false);
1919
}
2020

21+
#[test]
22+
#[cfg(vss_test)]
23+
fn channel_full_cycle_with_vss_store() {
24+
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
25+
println!("== Node A ==");
26+
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
27+
let config_a = random_config();
28+
let mut builder_a = NodeBuilder::from_config(config_a);
29+
builder_a.set_esplora_server(esplora_url.clone());
30+
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
31+
let node_a = builder_a.build_with_vss_store(&vss_base_url, "node_1_store".to_string()).unwrap();
32+
node_a.start().unwrap();
33+
34+
println!("\n== Node B ==");
35+
let config_b = random_config();
36+
let mut builder_b = NodeBuilder::from_config(config_b);
37+
builder_b.set_esplora_server(esplora_url);
38+
let node_b = builder_b.build_with_vss_store(&vss_base_url, "node_2_store".to_string()).unwrap();
39+
node_b.start().unwrap();
40+
41+
do_channel_full_cycle(node_a, node_b, &bitcoind, &electrsd, false);
42+
}
43+
2144
#[test]
2245
fn channel_full_cycle_0conf() {
2346
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();

0 commit comments

Comments
 (0)