@@ -84,30 +84,33 @@ impl_writeable_tlv_based_enum!(Event,
84
84
} ;
85
85
) ;
86
86
87
- pub struct EventQueue < K : Deref >
87
+ pub struct EventQueue < K : Deref , L : Deref >
88
88
where
89
89
K :: Target : KVStore ,
90
+ L :: Target : Logger ,
90
91
{
91
92
queue : Mutex < VecDeque < Event > > ,
92
93
notifier : Condvar ,
93
94
kv_store : K ,
95
+ logger : L ,
94
96
}
95
97
96
- impl < K : Deref > EventQueue < K >
98
+ impl < K : Deref , L : Deref > EventQueue < K , L >
97
99
where
98
100
K :: Target : KVStore ,
101
+ L :: Target : Logger ,
99
102
{
100
- pub ( crate ) fn new ( kv_store : K ) -> Self {
103
+ pub ( crate ) fn new ( kv_store : K , logger : L ) -> Self {
101
104
let queue: Mutex < VecDeque < Event > > = Mutex :: new ( VecDeque :: new ( ) ) ;
102
105
let notifier = Condvar :: new ( ) ;
103
- Self { queue, notifier, kv_store }
106
+ Self { queue, notifier, kv_store, logger }
104
107
}
105
108
106
109
pub ( crate ) fn add_event ( & self , event : Event ) -> Result < ( ) , Error > {
107
110
{
108
111
let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
109
112
locked_queue. push_back ( event) ;
110
- self . persist_queue ( & locked_queue) ?;
113
+ self . write_queue_and_commit ( & locked_queue) ?;
111
114
}
112
115
113
116
self . notifier . notify_one ( ) ;
@@ -124,37 +127,64 @@ where
124
127
{
125
128
let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
126
129
locked_queue. pop_front ( ) ;
127
- self . persist_queue ( & locked_queue) ?;
130
+ self . write_queue_and_commit ( & locked_queue) ?;
128
131
}
129
132
self . notifier . notify_one ( ) ;
130
133
Ok ( ( ) )
131
134
}
132
135
133
- fn persist_queue ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
136
+ fn write_queue_and_commit ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
134
137
let mut writer = self
135
138
. kv_store
136
139
. write ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
137
- . map_err ( |_| Error :: PersistenceFailed ) ?;
138
- EventQueueSerWrapper ( locked_queue)
139
- . write ( & mut writer)
140
- . map_err ( |_| Error :: PersistenceFailed ) ?;
141
- writer. commit ( ) . map_err ( |_| Error :: PersistenceFailed ) ?;
140
+ . map_err ( |e| {
141
+ log_error ! (
142
+ self . logger,
143
+ "Getting writer for key {}/{} failed due to: {}" ,
144
+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
145
+ EVENT_QUEUE_PERSISTENCE_KEY ,
146
+ e
147
+ ) ;
148
+ Error :: PersistenceFailed
149
+ } ) ?;
150
+ EventQueueSerWrapper ( locked_queue) . write ( & mut writer) . map_err ( |e| {
151
+ log_error ! (
152
+ self . logger,
153
+ "Writing event queue data to key {}/{} failed due to: {}" ,
154
+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
155
+ EVENT_QUEUE_PERSISTENCE_KEY ,
156
+ e
157
+ ) ;
158
+ Error :: PersistenceFailed
159
+ } ) ?;
160
+ writer. commit ( ) . map_err ( |e| {
161
+ log_error ! (
162
+ self . logger,
163
+ "Committing event queue data to key {}/{} failed due to: {}" ,
164
+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
165
+ EVENT_QUEUE_PERSISTENCE_KEY ,
166
+ e
167
+ ) ;
168
+ Error :: PersistenceFailed
169
+ } ) ?;
142
170
Ok ( ( ) )
143
171
}
144
172
}
145
173
146
- impl < K : Deref > ReadableArgs < K > for EventQueue < K >
174
+ impl < K : Deref , L : Deref > ReadableArgs < ( K , L ) > for EventQueue < K , L >
147
175
where
148
176
K :: Target : KVStore ,
177
+ L :: Target : Logger ,
149
178
{
150
179
#[ inline]
151
180
fn read < R : lightning:: io:: Read > (
152
- reader : & mut R , kv_store : K ,
181
+ reader : & mut R , args : ( K , L ) ,
153
182
) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
183
+ let ( kv_store, logger) = args;
154
184
let read_queue: EventQueueDeserWrapper = Readable :: read ( reader) ?;
155
185
let queue: Mutex < VecDeque < Event > > = Mutex :: new ( read_queue. 0 ) ;
156
186
let notifier = Condvar :: new ( ) ;
157
- Ok ( Self { queue, notifier, kv_store } )
187
+ Ok ( Self { queue, notifier, kv_store, logger } )
158
188
}
159
189
}
160
190
@@ -191,11 +221,11 @@ where
191
221
L :: Target : Logger ,
192
222
{
193
223
wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
194
- event_queue : Arc < EventQueue < K > > ,
224
+ event_queue : Arc < EventQueue < K , L > > ,
195
225
channel_manager : Arc < ChannelManager > ,
196
226
network_graph : Arc < NetworkGraph > ,
197
227
keys_manager : Arc < KeysManager > ,
198
- payment_store : Arc < PaymentInfoStorage < K > > ,
228
+ payment_store : Arc < PaymentInfoStorage < K , L > > ,
199
229
tokio_runtime : Arc < tokio:: runtime:: Runtime > ,
200
230
logger : L ,
201
231
_config : Arc < Config > ,
@@ -207,9 +237,9 @@ where
207
237
L :: Target : Logger ,
208
238
{
209
239
pub fn new (
210
- wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > , event_queue : Arc < EventQueue < K > > ,
240
+ wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > , event_queue : Arc < EventQueue < K , L > > ,
211
241
channel_manager : Arc < ChannelManager > , network_graph : Arc < NetworkGraph > ,
212
- keys_manager : Arc < KeysManager > , payment_store : Arc < PaymentInfoStorage < K > > ,
242
+ keys_manager : Arc < KeysManager > , payment_store : Arc < PaymentInfoStorage < K , L > > ,
213
243
tokio_runtime : Arc < tokio:: runtime:: Runtime > , logger : L , _config : Arc < Config > ,
214
244
) -> Self {
215
245
Self {
@@ -557,12 +587,13 @@ where
557
587
#[ cfg( test) ]
558
588
mod tests {
559
589
use super :: * ;
560
- use crate :: test:: utils:: TestStore ;
590
+ use crate :: test:: utils:: { TestLogger , TestStore } ;
561
591
562
592
#[ test]
563
593
fn event_queue_persistence ( ) {
564
594
let store = Arc :: new ( TestStore :: new ( ) ) ;
565
- let event_queue = EventQueue :: new ( Arc :: clone ( & store) ) ;
595
+ let logger = Arc :: new ( TestLogger :: new ( ) ) ;
596
+ let event_queue = EventQueue :: new ( Arc :: clone ( & store) , Arc :: clone ( & logger) ) ;
566
597
567
598
let expected_event = Event :: ChannelReady { channel_id : [ 23u8 ; 32 ] , user_channel_id : 2323 } ;
568
599
event_queue. add_event ( expected_event. clone ( ) ) . unwrap ( ) ;
@@ -579,7 +610,7 @@ mod tests {
579
610
. get_persisted_bytes ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
580
611
. unwrap ( ) ;
581
612
let deser_event_queue =
582
- EventQueue :: read ( & mut & persisted_bytes[ ..] , Arc :: clone ( & store) ) . unwrap ( ) ;
613
+ EventQueue :: read ( & mut & persisted_bytes[ ..] , ( Arc :: clone ( & store) , logger ) ) . unwrap ( ) ;
583
614
assert_eq ! ( deser_event_queue. next_event( ) , expected_event) ;
584
615
assert ! ( !store. get_and_clear_did_persist( ) ) ;
585
616
0 commit comments