@@ -3,7 +3,7 @@ use crate::{
3
3
PaymentInfo , PaymentInfoStorage , PaymentStatus , Wallet ,
4
4
} ;
5
5
6
- use crate :: io:: { KVStoreUnpersister , EVENT_QUEUE_PERSISTENCE_KEY } ;
6
+ use crate :: io:: { KVStore , EVENT_QUEUE_PERSISTENCE_KEY , EVENT_QUEUE_PERSISTENCE_NAMESPACE } ;
7
7
use crate :: logger:: { log_error, log_info, Logger } ;
8
8
9
9
use lightning:: chain:: chaininterface:: { BroadcasterInterface , ConfirmationTarget , FeeEstimator } ;
@@ -14,7 +14,6 @@ use lightning::util::errors::APIError;
14
14
use lightning:: util:: events:: Event as LdkEvent ;
15
15
use lightning:: util:: events:: EventHandler as LdkEventHandler ;
16
16
use lightning:: util:: events:: PaymentPurpose ;
17
- use lightning:: util:: persist:: KVStorePersister ;
18
17
use lightning:: util:: ser:: { Readable , ReadableArgs , Writeable , Writer } ;
19
18
20
19
use bitcoin:: secp256k1:: Secp256k1 ;
@@ -85,21 +84,21 @@ impl_writeable_tlv_based_enum!(Event,
85
84
86
85
pub struct EventQueue < K : Deref >
87
86
where
88
- K :: Target : KVStorePersister ,
87
+ K :: Target : KVStore ,
89
88
{
90
89
queue : Mutex < VecDeque < Event > > ,
91
90
notifier : Condvar ,
92
- persister : K ,
91
+ kv_store : K ,
93
92
}
94
93
95
94
impl < K : Deref > EventQueue < K >
96
95
where
97
- K :: Target : KVStorePersister ,
96
+ K :: Target : KVStore ,
98
97
{
99
- pub ( crate ) fn new ( persister : K ) -> Self {
98
+ pub ( crate ) fn new ( kv_store : K ) -> Self {
100
99
let queue: Mutex < VecDeque < Event > > = Mutex :: new ( VecDeque :: new ( ) ) ;
101
100
let notifier = Condvar :: new ( ) ;
102
- Self { queue, notifier, persister }
101
+ Self { queue, notifier, kv_store }
103
102
}
104
103
105
104
pub ( crate ) fn add_event ( & self , event : Event ) -> Result < ( ) , Error > {
@@ -130,25 +129,30 @@ where
130
129
}
131
130
132
131
fn persist_queue ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
133
- self . persister
134
- . persist ( EVENT_QUEUE_PERSISTENCE_KEY , & EventQueueSerWrapper ( locked_queue) )
132
+ let mut writer = self
133
+ . kv_store
134
+ . write ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
135
135
. map_err ( |_| Error :: PersistenceFailed ) ?;
136
+ EventQueueSerWrapper ( locked_queue)
137
+ . write ( & mut writer)
138
+ . map_err ( |_| Error :: PersistenceFailed ) ?;
139
+ writer. commit ( ) . map_err ( |_| Error :: PersistenceFailed ) ?;
136
140
Ok ( ( ) )
137
141
}
138
142
}
139
143
140
144
impl < K : Deref > ReadableArgs < K > for EventQueue < K >
141
145
where
142
- K :: Target : KVStorePersister ,
146
+ K :: Target : KVStore ,
143
147
{
144
148
#[ inline]
145
149
fn read < R : lightning:: io:: Read > (
146
- reader : & mut R , persister : K ,
150
+ reader : & mut R , kv_store : K ,
147
151
) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
148
152
let read_queue: EventQueueDeserWrapper = Readable :: read ( reader) ?;
149
153
let queue: Mutex < VecDeque < Event > > = Mutex :: new ( read_queue. 0 ) ;
150
154
let notifier = Condvar :: new ( ) ;
151
- Ok ( Self { queue, notifier, persister } )
155
+ Ok ( Self { queue, notifier, kv_store } )
152
156
}
153
157
}
154
158
@@ -181,7 +185,7 @@ impl Writeable for EventQueueSerWrapper<'_> {
181
185
182
186
pub ( crate ) struct EventHandler < K : Deref + Clone , L : Deref >
183
187
where
184
- K :: Target : KVStorePersister + KVStoreUnpersister ,
188
+ K :: Target : KVStore ,
185
189
L :: Target : Logger ,
186
190
{
187
191
wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
@@ -197,7 +201,7 @@ where
197
201
198
202
impl < K : Deref + Clone , L : Deref > EventHandler < K , L >
199
203
where
200
- K :: Target : KVStorePersister + KVStoreUnpersister ,
204
+ K :: Target : KVStore ,
201
205
L :: Target : Logger ,
202
206
{
203
207
pub fn new (
@@ -222,7 +226,7 @@ where
222
226
223
227
impl < K : Deref + Clone , L : Deref > LdkEventHandler for EventHandler < K , L >
224
228
where
225
- K :: Target : KVStorePersister + KVStoreUnpersister ,
229
+ K :: Target : KVStore ,
226
230
L :: Target : Logger ,
227
231
{
228
232
fn handle_event ( & self , event : LdkEvent ) {
@@ -551,33 +555,35 @@ where
551
555
#[ cfg( test) ]
552
556
mod tests {
553
557
use super :: * ;
554
- use crate :: test:: utils:: TestPersister ;
558
+ use crate :: test:: utils:: TestStore ;
555
559
556
560
#[ test]
557
561
fn event_queue_persistence ( ) {
558
- let persister = Arc :: new ( TestPersister :: new ( ) ) ;
559
- let event_queue = EventQueue :: new ( Arc :: clone ( & persister ) ) ;
562
+ let store = Arc :: new ( TestStore :: new ( ) ) ;
563
+ let event_queue = EventQueue :: new ( Arc :: clone ( & store ) ) ;
560
564
561
565
let expected_event = Event :: ChannelReady { channel_id : [ 23u8 ; 32 ] , user_channel_id : 2323 } ;
562
566
event_queue. add_event ( expected_event. clone ( ) ) . unwrap ( ) ;
563
- assert ! ( persister . get_and_clear_did_persist( ) ) ;
567
+ assert ! ( store . get_and_clear_did_persist( ) ) ;
564
568
565
569
// Check we get the expected event and that it is returned until we mark it handled.
566
570
for _ in 0 ..5 {
567
571
assert_eq ! ( event_queue. next_event( ) , expected_event) ;
568
- assert_eq ! ( false , persister . get_and_clear_did_persist( ) ) ;
572
+ assert_eq ! ( false , store . get_and_clear_did_persist( ) ) ;
569
573
}
570
574
571
575
// Check we can read back what we persisted.
572
- let persisted_bytes = persister. get_persisted_bytes ( EVENT_QUEUE_PERSISTENCE_KEY ) . unwrap ( ) ;
576
+ let persisted_bytes = store
577
+ . get_persisted_bytes ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
578
+ . unwrap ( ) ;
573
579
let deser_event_queue =
574
- EventQueue :: read ( & mut & persisted_bytes[ ..] , Arc :: clone ( & persister ) ) . unwrap ( ) ;
580
+ EventQueue :: read ( & mut & persisted_bytes[ ..] , Arc :: clone ( & store ) ) . unwrap ( ) ;
575
581
assert_eq ! ( deser_event_queue. next_event( ) , expected_event) ;
576
- assert ! ( !persister . get_and_clear_did_persist( ) ) ;
582
+ assert ! ( !store . get_and_clear_did_persist( ) ) ;
577
583
578
584
// Check we persisted on `event_handled()`
579
585
event_queue. event_handled ( ) . unwrap ( ) ;
580
586
581
- assert ! ( persister . get_and_clear_did_persist( ) ) ;
587
+ assert ! ( store . get_and_clear_did_persist( ) ) ;
582
588
}
583
589
}
0 commit comments