Skip to content
This repository was archived by the owner on Jan 6, 2025. It is now read-only.

Commit 396bdf8

Browse files
committed
Add test for EventQueue operations
1 parent 7a1a7c0 commit 396bdf8

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ serde_json = "1.0"
2525

2626
[dev-dependencies]
2727
proptest = "1.0.0"
28+
tokio = { version = "1.35", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }

src/events.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,112 @@ impl Future for EventFuture {
128128
}
129129
}
130130
}
131+
132+
#[cfg(test)]
133+
mod tests {
134+
use super::*;
135+
use crate::lsps0::event::LSPS0ClientEvent;
136+
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
137+
138+
#[tokio::test]
139+
#[cfg(feature = "std")]
140+
async fn event_queue_works() {
141+
use core::sync::atomic::{AtomicU16, Ordering};
142+
use std::sync::Arc;
143+
use std::time::Duration;
144+
145+
let event_queue = Arc::new(EventQueue::new());
146+
assert_eq!(event_queue.next_event(), None);
147+
148+
let secp_ctx = Secp256k1::new();
149+
let counterparty_node_id =
150+
PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap());
151+
let expected_event = Event::LSPS0Client(LSPS0ClientEvent::ListProtocolsResponse {
152+
counterparty_node_id,
153+
protocols: Vec::new(),
154+
});
155+
156+
for _ in 0..3 {
157+
event_queue.enqueue(expected_event.clone());
158+
}
159+
160+
assert_eq!(event_queue.wait_next_event(), expected_event);
161+
assert_eq!(event_queue.next_event_async().await, expected_event);
162+
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
163+
assert_eq!(event_queue.next_event(), None);
164+
165+
// Check `next_event_async` won't return if the queue is empty and always rather timeout.
166+
tokio::select! {
167+
_ = tokio::time::sleep(Duration::from_millis(10)) => {
168+
// Timeout
169+
}
170+
_ = event_queue.next_event_async() => {
171+
panic!();
172+
}
173+
}
174+
assert_eq!(event_queue.next_event(), None);
175+
176+
// Check we get the expected number of events when polling/enqueuing concurrently.
177+
let enqueued_events = AtomicU16::new(0);
178+
let received_events = AtomicU16::new(0);
179+
let mut delayed_enqueue = false;
180+
181+
for _ in 0..25 {
182+
event_queue.enqueue(expected_event.clone());
183+
enqueued_events.fetch_add(1, Ordering::SeqCst);
184+
}
185+
186+
loop {
187+
tokio::select! {
188+
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
189+
event_queue.enqueue(expected_event.clone());
190+
enqueued_events.fetch_add(1, Ordering::SeqCst);
191+
delayed_enqueue = true;
192+
}
193+
e = event_queue.next_event_async() => {
194+
assert_eq!(e, expected_event);
195+
received_events.fetch_add(1, Ordering::SeqCst);
196+
197+
event_queue.enqueue(expected_event.clone());
198+
enqueued_events.fetch_add(1, Ordering::SeqCst);
199+
}
200+
e = event_queue.next_event_async() => {
201+
assert_eq!(e, expected_event);
202+
received_events.fetch_add(1, Ordering::SeqCst);
203+
}
204+
}
205+
206+
if delayed_enqueue
207+
&& received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
208+
{
209+
break;
210+
}
211+
}
212+
assert_eq!(event_queue.next_event(), None);
213+
214+
// Check we operate correctly, even when mixing and matching blocking and async API calls.
215+
let (tx, mut rx) = tokio::sync::watch::channel(());
216+
let thread_queue = Arc::clone(&event_queue);
217+
let thread_event = expected_event.clone();
218+
std::thread::spawn(move || {
219+
let e = thread_queue.wait_next_event();
220+
assert_eq!(e, thread_event);
221+
tx.send(()).unwrap();
222+
});
223+
224+
let thread_queue = Arc::clone(&event_queue);
225+
let thread_event = expected_event.clone();
226+
std::thread::spawn(move || {
227+
// Sleep a bit before we enqueue the events everybody is waiting for.
228+
std::thread::sleep(Duration::from_millis(20));
229+
thread_queue.enqueue(thread_event.clone());
230+
thread_queue.enqueue(thread_event.clone());
231+
});
232+
233+
let e = event_queue.next_event_async().await;
234+
assert_eq!(e, expected_event.clone());
235+
236+
rx.changed().await.unwrap();
237+
assert_eq!(event_queue.next_event(), None);
238+
}
239+
}

0 commit comments

Comments
 (0)