1
- use std :: io:: Cursor ;
1
+ use io:: Error ;
2
2
use std:: io;
3
+ use std:: io:: ErrorKind ;
4
+ #[ cfg( test) ]
5
+ use std:: panic:: RefUnwindSafe ;
3
6
4
- use lightning :: util :: persist :: KVStorePersister ;
5
- use lightning:: util:: ser :: Writeable ;
7
+ use crate :: io :: utils :: check_namespace_key_validity ;
8
+ use lightning:: util:: persist :: KVStore ;
6
9
use tokio:: runtime:: Runtime ;
7
10
use vss_client:: client:: VssClient ;
8
11
use vss_client:: error:: VssError ;
9
12
use vss_client:: types:: {
10
13
DeleteObjectRequest , GetObjectRequest , KeyValue , ListKeyVersionsRequest , PutObjectRequest ,
11
14
} ;
12
15
13
- use crate :: io:: get_namespace_and_key_from_prefixed;
14
- use crate :: KVStore ;
15
-
16
16
/// A [`KVStore`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
17
17
pub struct VssStore {
18
18
client : VssClient ,
@@ -21,43 +21,54 @@ pub struct VssStore {
21
21
}
22
22
23
23
impl VssStore {
24
+ #[ cfg( feature = "vss" ) ]
24
25
pub ( crate ) fn new ( base_url : & str , store_id : String ) -> Self {
25
26
let client = VssClient :: new ( base_url) ;
26
27
let runtime = tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ;
27
28
Self { client, store_id, runtime }
28
29
}
29
30
30
- fn build_key ( & self , namespace : & str , key : & str ) -> io:: Result < String > {
31
+ fn build_key ( & self , namespace : & str , sub_namespace : & str , key : & str ) -> io:: Result < String > {
31
32
if key. is_empty ( ) {
32
- return Err ( io :: Error :: new ( io :: ErrorKind :: Other , "Empty key is not allowed" ) ) ;
33
+ return Err ( Error :: new ( ErrorKind :: Other , "Empty key is not allowed" ) ) ;
33
34
}
35
+ // But namespace and sub_namespace can be empty
34
36
if namespace. is_empty ( ) {
35
37
Ok ( key. to_string ( ) )
36
38
} else {
37
- Ok ( format ! ( "{}/{} " , namespace, key) )
39
+ Ok ( format ! ( "{}#{}#{} " , namespace, sub_namespace , key) )
38
40
}
39
41
}
40
42
41
- fn split_key ( & self , key : & str ) -> ( String , String ) {
42
- get_namespace_and_key_from_prefixed ( key) . unwrap ( )
43
+ fn split_key ( & self , key : & str ) -> io:: Result < ( String , String , String ) > {
44
+ let parts: Vec < & str > = key. split ( '#' ) . collect ( ) ;
45
+ match parts. as_slice ( ) {
46
+ [ namespace, sub_namespace, actual_key] => {
47
+ Ok ( ( namespace. to_string ( ) , sub_namespace. to_string ( ) , actual_key. to_string ( ) ) )
48
+ }
49
+ _ => Err ( Error :: new ( ErrorKind :: InvalidData , "Invalid key format" ) ) ,
50
+ }
43
51
}
44
52
45
- async fn list_all_keys ( & self , namespace : & str ) -> Result < Vec < String > , VssError > {
53
+ async fn list_all_keys ( & self , namespace : & str , sub_namespace : & str ) -> io :: Result < Vec < String > > {
46
54
let mut page_token = None ;
47
55
let mut keys = vec ! [ ] ;
48
-
56
+ let key_prefix = format ! ( "{}#{}" , namespace , sub_namespace ) ;
49
57
while page_token != Some ( "" . to_string ( ) ) {
50
58
let request = ListKeyVersionsRequest {
51
59
store_id : self . store_id . to_string ( ) ,
52
- key_prefix : Some ( namespace . to_string ( ) ) ,
60
+ key_prefix : Some ( key_prefix . to_string ( ) ) ,
53
61
page_token,
54
62
page_size : None ,
55
63
} ;
56
64
57
- let response = self . client . list_key_versions ( & request) . await ?;
65
+ let response = self . client . list_key_versions ( & request) . await . map_err ( |e| {
66
+ let msg = format ! ( "Failed to list keys in {}/{}: {}" , namespace, sub_namespace, e) ;
67
+ Error :: new ( ErrorKind :: Other , msg)
68
+ } ) ?;
58
69
59
70
for kv in response. key_versions {
60
- keys. push ( self . split_key ( & kv. key ) . 1 ) ;
71
+ keys. push ( self . split_key ( & kv. key ) ? . 2 ) ;
61
72
}
62
73
page_token = response. next_page_token ;
63
74
}
@@ -66,37 +77,38 @@ impl VssStore {
66
77
}
67
78
68
79
impl KVStore for VssStore {
69
- type Reader = Cursor < Vec < u8 > > ;
70
-
71
- fn read ( & self , namespace : & str , key : & str ) -> io:: Result < Self :: Reader > {
80
+ fn read ( & self , namespace : & str , sub_namespace : & str , key : & str ) -> io:: Result < Vec < u8 > > {
81
+ check_namespace_key_validity ( namespace, sub_namespace, Some ( key) , "read" ) ?;
72
82
let request = GetObjectRequest {
73
83
store_id : self . store_id . to_string ( ) ,
74
- key : self . build_key ( namespace, key) ?,
84
+ key : self . build_key ( namespace, sub_namespace , key) ?,
75
85
} ;
76
-
86
+ // self.runtime.spawn()
77
87
let resp =
78
88
tokio:: task:: block_in_place ( || self . runtime . block_on ( self . client . get_object ( & request) ) )
79
- . map_err ( |e| {
80
- match e {
81
- VssError :: NoSuchKeyError ( ..) => {
82
- let msg = format ! ( "Failed to read as key could not be found: {}/{}. Details: {}" , namespace, key, e) ;
83
- io:: Error :: new ( io:: ErrorKind :: NotFound , msg)
84
- }
85
- _ => {
86
- let msg = format ! ( "Failed to read from key {}/{}: {}" , namespace, key, e) ;
87
- io:: Error :: new ( io:: ErrorKind :: Other , msg)
88
- }
89
+ . map_err ( |e| match e {
90
+ VssError :: NoSuchKeyError ( ..) => {
91
+ let msg = format ! (
92
+ "Failed to read as key could not be found: {}/{}. Details: {}" ,
93
+ namespace, key, e
94
+ ) ;
95
+ Error :: new ( ErrorKind :: NotFound , msg)
96
+ }
97
+ _ => {
98
+ let msg = format ! ( "Failed to read from key {}/{}: {}" , namespace, key, e) ;
99
+ Error :: new ( ErrorKind :: Other , msg)
89
100
}
90
101
} ) ?;
91
- Ok ( Cursor :: new ( resp. value . unwrap ( ) . value ) )
102
+ Ok ( resp. value . unwrap ( ) . value )
92
103
}
93
104
94
- fn write ( & self , namespace : & str , key : & str , buf : & [ u8 ] ) -> io:: Result < ( ) > {
105
+ fn write ( & self , namespace : & str , sub_namespace : & str , key : & str , buf : & [ u8 ] ) -> io:: Result < ( ) > {
106
+ check_namespace_key_validity ( namespace, sub_namespace, Some ( key) , "write" ) ?;
95
107
let request = PutObjectRequest {
96
108
store_id : self . store_id . to_string ( ) ,
97
109
global_version : None ,
98
110
transaction_items : vec ! [ KeyValue {
99
- key: self . build_key( namespace, key) ?,
111
+ key: self . build_key( namespace, sub_namespace , key) ?,
100
112
version: -1 ,
101
113
value: buf. to_vec( ) ,
102
114
} ] ,
@@ -106,17 +118,20 @@ impl KVStore for VssStore {
106
118
tokio:: task:: block_in_place ( || self . runtime . block_on ( self . client . put_object ( & request) ) )
107
119
. map_err ( |e| {
108
120
let msg = format ! ( "Failed to write to key {}/{}: {}" , namespace, key, e) ;
109
- io :: Error :: new ( io :: ErrorKind :: Other , msg)
121
+ Error :: new ( ErrorKind :: Other , msg)
110
122
} ) ?;
111
123
112
124
Ok ( ( ) )
113
125
}
114
126
115
- fn remove ( & self , namespace : & str , key : & str ) -> io:: Result < ( ) > {
127
+ fn remove (
128
+ & self , namespace : & str , sub_namespace : & str , key : & str , _lazy : bool ,
129
+ ) -> io:: Result < ( ) > {
130
+ check_namespace_key_validity ( namespace, sub_namespace, Some ( key) , "remove" ) ?;
116
131
let request = DeleteObjectRequest {
117
132
store_id : self . store_id . to_string ( ) ,
118
133
key_value : Some ( KeyValue {
119
- key : self . build_key ( namespace, key) ?,
134
+ key : self . build_key ( namespace, sub_namespace , key) ?,
120
135
version : -1 ,
121
136
value : vec ! [ ] ,
122
137
} ) ,
@@ -125,54 +140,44 @@ impl KVStore for VssStore {
125
140
tokio:: task:: block_in_place ( || self . runtime . block_on ( self . client . delete_object ( & request) ) )
126
141
. map_err ( |e| {
127
142
let msg = format ! ( "Failed to delete key {}/{}: {}" , namespace, key, e) ;
128
- io :: Error :: new ( io :: ErrorKind :: Other , msg)
143
+ Error :: new ( ErrorKind :: Other , msg)
129
144
} ) ?;
130
145
Ok ( ( ) )
131
146
}
132
147
133
- fn list ( & self , namespace : & str ) -> io:: Result < Vec < String > > {
134
- let keys =
135
- tokio:: task:: block_in_place ( || self . runtime . block_on ( self . list_all_keys ( namespace) ) )
136
- . map_err ( |e| {
137
- let msg =
138
- format ! ( "Failed to retrieve keys in namespace: {} : {}" , namespace, e) ;
139
- io:: Error :: new ( io:: ErrorKind :: Other , msg)
140
- } ) ?;
148
+ fn list ( & self , namespace : & str , sub_namespace : & str ) -> io:: Result < Vec < String > > {
149
+ check_namespace_key_validity ( namespace, sub_namespace, None , "list" ) ?;
150
+
151
+ let keys = tokio:: task:: block_in_place ( || {
152
+ self . runtime . block_on ( self . list_all_keys ( namespace, sub_namespace) )
153
+ } )
154
+ . map_err ( |e| {
155
+ let msg = format ! ( "Failed to retrieve keys in namespace: {} : {}" , namespace, e) ;
156
+ Error :: new ( ErrorKind :: Other , msg)
157
+ } ) ?;
141
158
142
159
Ok ( keys)
143
160
}
144
161
}
145
162
146
- impl KVStorePersister for VssStore {
147
- fn persist < W : Writeable > ( & self , prefixed_key : & str , object : & W ) -> io:: Result < ( ) > {
148
- let ( namespace, key) = self . split_key ( prefixed_key) ;
149
- self . write ( & namespace, & key, & object. encode ( ) ) ?;
150
- Ok ( ( ) )
151
- }
152
- }
163
+ #[ cfg( test) ]
164
+ impl RefUnwindSafe for VssStore { }
153
165
154
166
#[ cfg( test) ]
167
+ #[ cfg( feature = "vss-test" ) ]
155
168
mod tests {
156
- use proptest:: prelude:: * ;
157
-
158
- use crate :: io:: do_read_write_remove_list_persist;
159
- use crate :: test:: utils:: random_storage_path;
160
-
161
169
use super :: * ;
162
-
163
- proptest ! {
164
- #[ test]
165
- fn read_write_remove_list_persist( data in any:: <[ u8 ; 32 ] >( ) ) {
166
- let vss_base_url = std:: env:: var( "TEST_VSS_BASE_URL" ) ;
167
- if vss_base_url. is_ok( )
168
- {
169
- let rand_store_id = random_storage_path( ) ;
170
- let vss_store = VssStore :: new( & vss_base_url. unwrap( ) , rand_store_id) ;
171
-
172
- do_read_write_remove_list_persist( & data, & vss_store) ;
173
- } else{
174
- eprintln!( "** SKIPPING `VssStore` test-suite since environment variable `TEST_VSS_BASE_URL` is not set **" ) ;
175
- }
176
- }
170
+ use crate :: io:: test_utils:: do_read_write_remove_list_persist;
171
+ use rand:: distributions:: Alphanumeric ;
172
+ use rand:: { thread_rng, Rng } ;
173
+
174
+ #[ test]
175
+ fn read_write_remove_list_persist ( ) {
176
+ let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
177
+ let mut rng = thread_rng ( ) ;
178
+ let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
179
+ let vss_store = VssStore :: new ( & vss_base_url, rand_store_id) ;
180
+
181
+ do_read_write_remove_list_persist ( & vss_store) ;
177
182
}
178
183
}
0 commit comments