Skip to content
This repository was archived by the owner on Jan 6, 2025. It is now read-only.

Commit 7a1a7c0

Browse files
committed
Implement next_event_async method polling the event queue
We implement a way to asynchronously poll the queue for new events, providing an async alternative to `wait_next_event`.
1 parent 6071465 commit 7a1a7c0

File tree

2 files changed

+48
-5
lines changed

2 files changed

+48
-5
lines changed

src/events.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,29 @@ use crate::lsps0;
2020
use crate::lsps1;
2121
use crate::lsps2;
2222
use crate::prelude::{Vec, VecDeque};
23-
use crate::sync::Mutex;
23+
use crate::sync::{Arc, Mutex};
24+
25+
use core::future::Future;
26+
use core::task::{Poll, Waker};
2427

2528
pub(crate) struct EventQueue {
26-
queue: Mutex<VecDeque<Event>>,
29+
queue: Arc<Mutex<VecDeque<Event>>>,
30+
waker: Arc<Mutex<Option<Waker>>>,
2731
#[cfg(feature = "std")]
2832
condvar: std::sync::Condvar,
2933
}
3034

3135
impl EventQueue {
3236
pub fn new() -> Self {
33-
let queue = Mutex::new(VecDeque::new());
37+
let queue = Arc::new(Mutex::new(VecDeque::new()));
38+
let waker = Arc::new(Mutex::new(None));
3439
#[cfg(feature = "std")]
3540
{
3641
let condvar = std::sync::Condvar::new();
37-
Self { queue, condvar }
42+
Self { queue, waker, condvar }
3843
}
3944
#[cfg(not(feature = "std"))]
40-
Self { queue }
45+
Self { queue, waker }
4146
}
4247

4348
pub fn enqueue(&self, event: Event) {
@@ -46,6 +51,9 @@ impl EventQueue {
4651
queue.push_back(event);
4752
}
4853

54+
if let Some(waker) = self.waker.lock().unwrap().take() {
55+
waker.wake();
56+
}
4957
#[cfg(feature = "std")]
5058
self.condvar.notify_one();
5159
}
@@ -54,6 +62,10 @@ impl EventQueue {
5462
self.queue.lock().unwrap().pop_front()
5563
}
5664

65+
pub async fn next_event_async(&self) -> Event {
66+
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
67+
}
68+
5769
#[cfg(feature = "std")]
5870
pub fn wait_next_event(&self) -> Event {
5971
let mut queue =
@@ -65,6 +77,10 @@ impl EventQueue {
6577
drop(queue);
6678

6779
if should_notify {
80+
if let Some(waker) = self.waker.lock().unwrap().take() {
81+
waker.wake();
82+
}
83+
6884
self.condvar.notify_one();
6985
}
7086

@@ -92,3 +108,23 @@ pub enum Event {
92108
/// An LSPS2 (JIT Channel) server event.
93109
LSPS2Service(lsps2::event::LSPS2ServiceEvent),
94110
}
111+
112+
struct EventFuture {
113+
event_queue: Arc<Mutex<VecDeque<Event>>>,
114+
waker: Arc<Mutex<Option<Waker>>>,
115+
}
116+
117+
impl Future for EventFuture {
118+
type Output = Event;
119+
120+
fn poll(
121+
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
122+
) -> core::task::Poll<Self::Output> {
123+
if let Some(event) = self.event_queue.lock().unwrap().pop_front() {
124+
Poll::Ready(event)
125+
} else {
126+
*self.waker.lock().unwrap() = Some(cx.waker().clone());
127+
Poll::Pending
128+
}
129+
}
130+
}

src/manager.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,13 @@ where {
305305
self.pending_events.next_event()
306306
}
307307

308+
/// Asynchronously polls the event queue and returns once the next event is ready.
309+
///
310+
/// Typically you would spawn a thread or task that calls this in a loop.
311+
pub async fn next_event_async(&self) -> Event {
312+
self.pending_events.next_event_async().await
313+
}
314+
308315
/// Returns and clears all events without blocking.
309316
///
310317
/// Typically you would spawn a thread or task that calls this in a loop.

0 commit comments

Comments
 (0)