Skip to content

Commit 5fd88c6

Browse files
committed
feat(runtime): faster SendWrapper with cached thread id
1 parent 456b47b commit 5fd88c6

File tree

3 files changed

+106
-7
lines changed

3 files changed

+106
-7
lines changed

compio-runtime/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ crossbeam-queue = { workspace = true }
4040
futures-util = { workspace = true }
4141
once_cell = { workspace = true }
4242
scoped-tls = "1.0.1"
43-
send_wrapper = "0.6.0"
4443
slab = { workspace = true, optional = true }
4544
smallvec = "1.11.1"
4645
socket2 = { workspace = true }

compio-runtime/src/runtime/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ use compio_driver::{
2020
use compio_log::{debug, instrument};
2121
use crossbeam_queue::SegQueue;
2222
use futures_util::{future::Either, FutureExt};
23-
use send_wrapper::SendWrapper;
2423
use smallvec::SmallVec;
2524

2625
pub(crate) mod op;
2726
#[cfg(feature = "time")]
2827
pub(crate) mod time;
2928

29+
mod send_wrapper;
30+
use send_wrapper::SendWrapper;
31+
3032
#[cfg(feature = "time")]
3133
use crate::runtime::time::{TimerFuture, TimerRuntime};
3234
use crate::{runtime::op::OpFuture, BufResult};
@@ -120,8 +122,8 @@ impl Runtime {
120122
.handle()
121123
.expect("cannot create notify handle of the proactor");
122124
let schedule = move |runnable| {
123-
if local_runnables.valid() {
124-
local_runnables.borrow_mut().push_back(runnable);
125+
if let Some(runnables) = local_runnables.get() {
126+
runnables.borrow_mut().push_back(runnable);
125127
} else {
126128
sync_runnables.push(runnable);
127129
handle.notify().ok();
@@ -136,9 +138,8 @@ impl Runtime {
136138
///
137139
/// Run the scheduled tasks.
138140
pub fn run(&self) {
139-
use std::ops::Deref;
140-
141-
let local_runnables = self.local_runnables.deref().deref();
141+
// SAFETY: self is !Send + !Sync.
142+
let local_runnables = unsafe { self.local_runnables.get_unchecked() };
142143
loop {
143144
let next_task = local_runnables.borrow_mut().pop_front();
144145
let has_local_task = next_task.is_some();
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::{
2+
cell::Cell,
3+
mem::{self, ManuallyDrop},
4+
thread::{self, ThreadId},
5+
};
6+
7+
thread_local! {
8+
static THREAD_ID: Cell<ThreadId> = Cell::new(thread::current().id());
9+
}
10+
11+
/// A wrapper that copied from `send_wrapper` crate, with our own optimizations.
12+
pub struct SendWrapper<T> {
13+
data: ManuallyDrop<T>,
14+
thread_id: ThreadId,
15+
}
16+
17+
impl<T> SendWrapper<T> {
18+
/// Create a `SendWrapper<T>` wrapper around a value of type `T`.
19+
/// The wrapper takes ownership of the value.
20+
pub fn new(data: T) -> SendWrapper<T> {
21+
SendWrapper {
22+
data: ManuallyDrop::new(data),
23+
thread_id: THREAD_ID.get(),
24+
}
25+
}
26+
27+
/// Returns `true` if the value can be safely accessed from within the
28+
/// current thread.
29+
pub fn valid(&self) -> bool {
30+
self.thread_id == THREAD_ID.get()
31+
}
32+
33+
/// Returns a reference to the contained value.
34+
///
35+
/// # Safety
36+
///
37+
/// The caller should be in the same thread as the creator.
38+
pub unsafe fn get_unchecked(&self) -> &T {
39+
&self.data
40+
}
41+
42+
/// Returns a reference to the contained value, if valid.
43+
pub fn get(&self) -> Option<&T> {
44+
if self.valid() { Some(&self.data) } else { None }
45+
}
46+
}
47+
48+
unsafe impl<T> Send for SendWrapper<T> {}
49+
unsafe impl<T> Sync for SendWrapper<T> {}
50+
51+
impl<T> Drop for SendWrapper<T> {
52+
/// Drops the contained value.
53+
///
54+
/// # Panics
55+
///
56+
/// Dropping panics if it is done from a different thread than the one the
57+
/// `SendWrapper<T>` instance has been created with.
58+
///
59+
/// Exceptions:
60+
/// - There is no extra panic if the thread is already panicking/unwinding.
61+
/// This is because otherwise there would be double panics (usually
62+
/// resulting in an abort) when dereferencing from a wrong thread.
63+
/// - If `T` has a trivial drop ([`needs_drop::<T>()`] is false) then this
64+
/// method never panics.
65+
///
66+
/// [`needs_drop::<T>()`]: std::mem::needs_drop
67+
#[track_caller]
68+
fn drop(&mut self) {
69+
// If the drop is trivial (`needs_drop` = false), then dropping `T` can't access
70+
// it and so it can be safely dropped on any thread.
71+
if !mem::needs_drop::<T>() || self.valid() {
72+
unsafe {
73+
// Drop the inner value
74+
//
75+
// Safety:
76+
// - We've just checked that it's valid to drop `T` on this thread
77+
// - We only move out from `self.data` here and in drop, so `self.data` is
78+
// present
79+
ManuallyDrop::drop(&mut self.data);
80+
}
81+
} else {
82+
invalid_drop()
83+
}
84+
}
85+
}
86+
87+
#[cold]
88+
#[inline(never)]
89+
#[track_caller]
90+
fn invalid_drop() {
91+
const DROP_ERROR: &str = "Dropped SendWrapper<T> variable from a thread different to the one \
92+
it has been created with.";
93+
94+
if !thread::panicking() {
95+
// panic because of dropping from wrong thread
96+
// only do this while not unwinding (could be caused by deref from wrong thread)
97+
panic!("{}", DROP_ERROR)
98+
}
99+
}

0 commit comments

Comments
 (0)