Skip to content

Commit 49e4be9

Browse files
authored
Merge pull request #250 from Berrysoft/refactor/reduce-eventfd-io
feat(driver): reduce eventfd IO
2 parents 40c7a3b + 8dbaf5d commit 49e4be9

File tree

4 files changed

+39
-14
lines changed

4 files changed

+39
-14
lines changed

compio-runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ criterion = { workspace = true, optional = true }
3939
crossbeam-queue = { workspace = true }
4040
futures-util = { workspace = true }
4141
once_cell = { workspace = true }
42+
send_wrapper = "0.6.0"
4243
slab = { workspace = true, optional = true }
4344
smallvec = "1.11.1"
4445
socket2 = { workspace = true }

compio-runtime/src/runtime/mod.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{
22
any::Any,
33
cell::RefCell,
4+
collections::VecDeque,
45
future::{ready, Future},
56
io,
67
panic::AssertUnwindSafe,
@@ -18,6 +19,7 @@ use compio_driver::{
1819
use compio_log::{debug, instrument};
1920
use crossbeam_queue::SegQueue;
2021
use futures_util::{future::Either, FutureExt};
22+
use send_wrapper::SendWrapper;
2123
use smallvec::SmallVec;
2224

2325
pub(crate) mod op;
@@ -46,7 +48,8 @@ impl Default for FutureState {
4648

4749
pub(crate) struct RuntimeInner {
4850
driver: RefCell<Proactor>,
49-
runnables: Arc<SegQueue<Runnable>>,
51+
local_runnables: Arc<SendWrapper<RefCell<VecDeque<Runnable>>>>,
52+
sync_runnables: Arc<SegQueue<Runnable>>,
5053
op_runtime: RefCell<OpRuntime>,
5154
#[cfg(feature = "time")]
5255
timer_runtime: RefCell<TimerRuntime>,
@@ -56,7 +59,8 @@ impl RuntimeInner {
5659
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
5760
Ok(Self {
5861
driver: RefCell::new(builder.build()?),
59-
runnables: Arc::new(SegQueue::new()),
62+
local_runnables: Arc::new(SendWrapper::new(RefCell::new(VecDeque::new()))),
63+
sync_runnables: Arc::new(SegQueue::new()),
6064
op_runtime: RefCell::default(),
6165
#[cfg(feature = "time")]
6266
timer_runtime: RefCell::new(TimerRuntime::new()),
@@ -65,27 +69,42 @@ impl RuntimeInner {
6569

6670
// Safety: be careful about the captured lifetime.
6771
pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> Task<F::Output> {
68-
let runnables = self.runnables.clone();
72+
let local_runnables = self.local_runnables.clone();
73+
let sync_runnables = self.sync_runnables.clone();
6974
let handle = self
7075
.driver
7176
.borrow()
7277
.handle()
7378
.expect("cannot create notify handle of the proactor");
7479
let schedule = move |runnable| {
75-
runnables.push(runnable);
76-
handle.notify().ok();
80+
if local_runnables.valid() {
81+
local_runnables.borrow_mut().push_back(runnable);
82+
} else {
83+
sync_runnables.push(runnable);
84+
handle.notify().ok();
85+
}
7786
};
7887
let (runnable, task) = async_task::spawn_unchecked(future, schedule);
7988
runnable.schedule();
8089
task
8190
}
8291

8392
pub fn run(&self) {
93+
use std::ops::Deref;
94+
95+
let local_runnables = self.local_runnables.deref().deref();
8496
loop {
85-
let next_task = self.runnables.pop();
97+
let next_task = local_runnables.borrow_mut().pop_front();
98+
let has_local_task = next_task.is_some();
8699
if let Some(task) = next_task {
87100
task.run();
88-
} else {
101+
}
102+
let next_task = self.sync_runnables.pop();
103+
let has_sync_task = next_task.is_some();
104+
if let Some(task) = next_task {
105+
task.run();
106+
}
107+
if !has_local_task && !has_sync_task {
89108
break;
90109
}
91110
}

compio-runtime/src/runtime/time.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ impl TimerRuntime {
8282
}
8383

8484
pub fn min_timeout(&self) -> Option<Duration> {
85-
let elapsed = self.time.elapsed();
8685
self.wheel.peek().map(|entry| {
86+
let elapsed = self.time.elapsed();
8787
if entry.0.delay > elapsed {
8888
entry.0.delay - elapsed
8989
} else {

compio-runtime/tests/custom_loop.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ fn cf_run_loop() {
1414
use core_foundation::{
1515
base::TCFType,
1616
filedescriptor::{kCFFileDescriptorReadCallBack, CFFileDescriptor, CFFileDescriptorRef},
17-
runloop::{kCFRunLoopDefaultMode, CFRunLoop, CFRunLoopRef},
17+
runloop::{kCFRunLoopDefaultMode, CFRunLoop, CFRunLoopRef, CFRunLoopStop},
1818
string::CFStringRef,
1919
};
2020

@@ -52,12 +52,13 @@ fn cf_run_loop() {
5252
}
5353
.detach();
5454
loop {
55+
self.runtime.poll_with(Some(Duration::ZERO));
56+
5557
self.runtime.run();
5658
if let Some(result) = result.take() {
5759
break result;
5860
}
5961

60-
self.runtime.poll_with(Some(Duration::ZERO));
6162
self.fd_source
6263
.enable_callbacks(kCFFileDescriptorReadCallBack);
6364
CFRunLoop::run_in_mode(
@@ -76,8 +77,12 @@ fn cf_run_loop() {
7677

7778
let event = Event::new();
7879
let handle = Arc::new(Mutex::new(Some(event.handle())));
80+
let run_loop = CFRunLoop::get_current();
7981
let block = StackBlock::new(move || {
8082
handle.lock().unwrap().take().unwrap().notify();
83+
unsafe {
84+
CFRunLoopStop(run_loop.as_concrete_TypeRef());
85+
}
8186
});
8287
extern "C" {
8388
fn CFRunLoopPerformBlock(rl: CFRunLoopRef, mode: CFStringRef, block: &Block<dyn Fn()>);
@@ -133,13 +138,13 @@ fn message_queue() {
133138
}
134139
.detach();
135140
loop {
141+
self.runtime.poll_with(Some(Duration::ZERO));
142+
136143
self.runtime.run();
137144
if let Some(result) = result.take() {
138145
break result;
139146
}
140147

141-
self.runtime.poll_with(Some(Duration::ZERO));
142-
143148
let timeout = self.runtime.current_timeout();
144149
let timeout = match timeout {
145150
Some(timeout) => timeout.as_millis() as u32,
@@ -231,13 +236,13 @@ fn glib_context() {
231236
}
232237
.detach();
233238
loop {
239+
self.runtime.poll_with(Some(Duration::ZERO));
240+
234241
self.runtime.run();
235242
if let Some(result) = result.take() {
236243
break result;
237244
}
238245

239-
self.runtime.poll_with(Some(Duration::ZERO));
240-
241246
let timeout = self.runtime.current_timeout();
242247
let source_id = timeout.map(|timeout| timeout_add_local_once(timeout, || {}));
243248

0 commit comments

Comments
 (0)