Skip to content

Commit fc3d2a8

Browse files
committed
[workerd-cxx] remove + Send
KJ event loop runs only on a single thread, so it is not legal to pass futures/promises to a different one. Removing +Send achieves this. Green threads while technically pass event loops between threads, they are creating a mirage of staying within the same thread. So we can use existing type system protection. This doesn't take opportinity to simplify the rest of the code (it follows). With this change the following code now fails appropriately: let x = ffi::new_ready_promise_void(); std::thread::spawn(async || { let _ = x.await; });
1 parent bcb514c commit fc3d2a8

File tree

8 files changed

+42
-89
lines changed

8 files changed

+42
-89
lines changed

kj-rs/awaiter.c++

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ namespace kj_rs {
1313
// that end, we use bindgen to generate an opaque FFI type of known size for RustPromiseAwaiter in
1414
// awaiter.h.rs.
1515
//
16-
static_assert(sizeof(GuardedRustPromiseAwaiter) == sizeof(GuardedRustPromiseAwaiterRepr),
17-
"GuardedRustPromiseAwaiter size changed, you must update lib.rs ffi");
18-
static_assert(alignof(GuardedRustPromiseAwaiter) == alignof(GuardedRustPromiseAwaiterRepr),
19-
"GuardedRustPromiseAwaiter alignment changed, you must update lib.rs ffi");
16+
static_assert(sizeof(RustPromiseAwaiter) == sizeof(RustPromiseAwaiterRepr),
17+
"RustPromiseAwaiter size changed, you must update lib.rs ffi");
18+
static_assert(alignof(RustPromiseAwaiter) == alignof(RustPromiseAwaiterRepr),
19+
"RustPromiseAwaiter alignment changed, you must update lib.rs ffi");
2020

2121
RustPromiseAwaiter::RustPromiseAwaiter(
2222
OptionWaker& optionWaker, OwnPromiseNode nodeParam, kj::SourceLocation location)
@@ -132,11 +132,11 @@ OwnPromiseNode RustPromiseAwaiter::take_own_promise_node() {
132132
return kj::mv(node);
133133
}
134134

135-
void guarded_rust_promise_awaiter_new_in_place(
136-
GuardedRustPromiseAwaiter* ptr, OptionWaker* optionWaker, OwnPromiseNode node) {
135+
void rust_promise_awaiter_new_in_place(
136+
RustPromiseAwaiter* ptr, OptionWaker* optionWaker, OwnPromiseNode node) {
137137
kj::ctor(*ptr, *optionWaker, kj::mv(node));
138138
}
139-
void guarded_rust_promise_awaiter_drop_in_place(GuardedRustPromiseAwaiter* ptr) {
139+
void rust_promise_awaiter_drop_in_place(RustPromiseAwaiter* ptr) {
140140
kj::dtor(*ptr);
141141
}
142142

@@ -204,20 +204,16 @@ void FuturePollEvent::tracePromise(kj::_::TraceBuilder& builder, bool stopAtNext
204204
}
205205
}
206206

207-
FuturePollEvent::PollScope::PollScope(FuturePollEvent& futurePollEvent): holder(futurePollEvent) {
207+
FuturePollEvent::PollScope::PollScope(FuturePollEvent& futurePollEvent): event(futurePollEvent) {
208208
futurePollEvent.enterPollScope();
209209
}
210210

211211
FuturePollEvent::PollScope::~PollScope() noexcept(false) {
212-
holder.get().futurePollEvent.exitPollScope(reset());
212+
event.exitPollScope(reset());
213213
}
214214

215215
kj::Maybe<FuturePollEvent&> FuturePollEvent::PollScope::tryGetFuturePollEvent() const {
216-
KJ_IF_SOME(h, holder.tryGet()) {
217-
return h.futurePollEvent;
218-
} else {
219-
return kj::none;
220-
}
216+
return event;
221217
}
222218

223219
} // namespace kj_rs

kj-rs/awaiter.h

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -106,24 +106,8 @@ class RustPromiseAwaiter final: public kj::_::Event,
106106
OwnPromiseNode node;
107107
};
108108

109-
// We force Rust to call our `poll()` overloads using this ExecutorGuarded wrapper around the actual
110-
// RustPromiseAwaiter class. This allows us to assume all calls that reach RustPromiseAwaiter itself
111-
// are on the correct thread.
112-
struct GuardedRustPromiseAwaiter: ExecutorGuarded<RustPromiseAwaiter> {
113-
// We need to inherit constructors or else placement-new will try to aggregate-initialize us.
114-
using ExecutorGuarded<RustPromiseAwaiter>::ExecutorGuarded;
115-
116-
bool poll(const WakerRef& waker, const KjWaker* maybeKjWaker) {
117-
return get().poll(waker, maybeKjWaker);
118-
}
119-
OwnPromiseNode take_own_promise_node() {
120-
return get().take_own_promise_node();
121-
}
122-
};
123-
124-
void guarded_rust_promise_awaiter_new_in_place(
125-
GuardedRustPromiseAwaiter*, OptionWaker*, OwnPromiseNode);
126-
void guarded_rust_promise_awaiter_drop_in_place(GuardedRustPromiseAwaiter*);
109+
void rust_promise_awaiter_new_in_place(RustPromiseAwaiter*, OptionWaker*, OwnPromiseNode);
110+
void rust_promise_awaiter_drop_in_place(RustPromiseAwaiter*);
127111

128112
// =======================================================================================
129113
// FuturePollEvent
@@ -187,10 +171,7 @@ class FuturePollEvent::PollScope: public LazyArcWaker {
187171
kj::Maybe<FuturePollEvent&> tryGetFuturePollEvent() const override;
188172

189173
private:
190-
struct FuturePollEventHolder {
191-
FuturePollEvent& futurePollEvent;
192-
};
193-
ExecutorGuarded<FuturePollEventHolder> holder;
174+
FuturePollEvent& event;
194175
};
195176

196177
// =======================================================================================

kj-rs/awaiter.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,9 @@ use std::mem::MaybeUninit;
22
use std::pin::Pin;
33
use std::task::Context;
44

5-
use crate::ffi::{GuardedRustPromiseAwaiter, GuardedRustPromiseAwaiterRepr};
5+
use crate::ffi::{RustPromiseAwaiter, RustPromiseAwaiterRepr};
66
use crate::waker::try_into_kj_waker_ptr;
77

8-
// =======================================================================================
9-
// GuardedRustPromiseAwaiter
10-
11-
// Safety: KJ Promises are not associated with threads, but with event loops at construction time.
12-
// Therefore, they can be polled from any thread, as long as that thread has the correct event loop
13-
// active at the time of the call to `poll()`. If the correct event loop is not active,
14-
// GuardedRustPromiseAwaiter's API will panic. (The Guarded- prefix refers to the C++ class template
15-
// ExecutorGuarded, which enforces the correct event loop requirement.)
16-
unsafe impl Send for GuardedRustPromiseAwaiter {}
17-
188
// =======================================================================================
199
// Await syntax for OwnPromiseNode
2010

@@ -23,7 +13,7 @@ use crate::OwnPromiseNode;
2313
pub struct PromiseAwaiter<Data: std::marker::Unpin> {
2414
node: Option<OwnPromiseNode>,
2515
pub(crate) data: Data,
26-
awaiter: MaybeUninit<GuardedRustPromiseAwaiterRepr>,
16+
awaiter: MaybeUninit<RustPromiseAwaiterRepr>,
2717
awaiter_initialized: bool,
2818
// Safety: `option_waker` must be declared after `awaiter`, because `awaiter` contains a reference
2919
// to `option_waker`. This ensures `option_waker` will be dropped after `awaiter`.
@@ -45,19 +35,19 @@ impl<Data: std::marker::Unpin> PromiseAwaiter<Data> {
4535
///
4636
/// Panics if `node` is None.
4737
#[must_use]
48-
pub fn get_awaiter(mut self: Pin<&mut Self>) -> Pin<&mut GuardedRustPromiseAwaiter> {
38+
pub fn get_awaiter(mut self: Pin<&mut Self>) -> Pin<&mut RustPromiseAwaiter> {
4939
// Safety: We never move out of `this`.
5040
let this = unsafe { Pin::into_inner_unchecked(self.as_mut()) };
5141

5242
// Initialize the awaiter if not already done
5343
if !this.awaiter_initialized {
5444
// On our first invocation, `node` will be Some, and `get_awaiter` will forward its
55-
// contents into GuardedRustPromiseAwaiter's constructor. On all subsequent invocations, `node`
45+
// contents into RustPromiseAwaiter's constructor. On all subsequent invocations, `node`
5646
// will be None and the constructor will not run.
5747
let node = this.node.take();
5848

5949
// Safety: `awaiter` stores `rust_waker_ptr` and uses it to call `wake()`. Note that
60-
// `awaiter` is `this.awaiter`, which lives before `this.option_waker`.
50+
// `awaiter` is `this.awaiter`, which lives before `this.option_waker`.
6151
// Since we drop awaiter manually, the `rust_waker_ptr` that `awaiter` stores will always
6252
// be valid during its lifetime.
6353
//
@@ -68,8 +58,8 @@ impl<Data: std::marker::Unpin> PromiseAwaiter<Data> {
6858

6959
// Safety: The memory slot is valid and this type ensures that it will stay pinned.
7060
unsafe {
71-
crate::ffi::guarded_rust_promise_awaiter_new_in_place(
72-
this.awaiter.as_mut_ptr().cast::<GuardedRustPromiseAwaiter>(),
61+
crate::ffi::rust_promise_awaiter_new_in_place(
62+
this.awaiter.as_mut_ptr().cast::<RustPromiseAwaiter>(),
7363
rust_waker_ptr,
7464
node.expect("node should be Some in call to init()"),
7565
);
@@ -79,8 +69,8 @@ impl<Data: std::marker::Unpin> PromiseAwaiter<Data> {
7969

8070
// Safety: `this.awaiter` is pinned since `self` is pinned.
8171
unsafe {
82-
let raw = this.awaiter.assume_init_mut() as *mut GuardedRustPromiseAwaiterRepr;
83-
let raw = raw.cast::<GuardedRustPromiseAwaiter>();
72+
let raw = this.awaiter.assume_init_mut() as *mut RustPromiseAwaiterRepr;
73+
let raw = raw.cast::<RustPromiseAwaiter>();
8474
Pin::new_unchecked(&mut *raw)
8575
}
8676
}
@@ -97,8 +87,8 @@ impl<Data: std::marker::Unpin> Drop for PromiseAwaiter<Data> {
9787
fn drop(&mut self) {
9888
if self.awaiter_initialized {
9989
unsafe {
100-
crate::ffi::guarded_rust_promise_awaiter_drop_in_place(
101-
self.awaiter.as_mut_ptr().cast::<GuardedRustPromiseAwaiter>()
90+
crate::ffi::rust_promise_awaiter_drop_in_place(
91+
self.awaiter.as_mut_ptr().cast::<RustPromiseAwaiter>(),
10292
);
10393
}
10494
}

kj-rs/future.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,19 @@ pub(crate) mod repr {
3838

3939
type DropCallback = unsafe extern "C" fn(fut: *mut c_void);
4040

41-
type FuturePtr<'a, T> = *mut (dyn Future<Output = Result<T, String>> + Send + 'a);
41+
type FuturePtr<'a, T> = *mut (dyn Future<Output = Result<T, String>> + 'a);
4242

43-
/// Represents a `dyn Future<Output = Result<T, String>>` + Send.
43+
/// Represents a `dyn Future<Output = Result<T, String>>`.
4444
#[repr(C)]
4545
pub struct RustFuture<'a, T> {
4646
pub fut: FuturePtr<'a, T>,
4747
pub poll: PollCallback,
4848
pub drop: DropCallback,
4949
}
5050

51-
type InfallibleFuturePtr<'a, T> = *mut (dyn Future<Output = T> + Send + 'a);
51+
type InfallibleFuturePtr<'a, T> = *mut (dyn Future<Output = T> + 'a);
5252

53-
/// Represents a `dyn Future<Output = T> + Send` where T is not a Result.
53+
/// Represents a `dyn Future<Output = T>` where T is not a Result.
5454
#[repr(C)]
5555
pub struct RustInfallibleFuture<'a, T> {
5656
pub fut: InfallibleFuturePtr<'a, T>,
@@ -127,7 +127,7 @@ pub(crate) mod repr {
127127

128128
#[must_use]
129129
pub fn future<'a, T: Unpin>(
130-
fut: Pin<Box<dyn Future<Output = Result<T, String>> + Send + 'a>>,
130+
fut: Pin<Box<dyn Future<Output = Result<T, String>> + 'a>>,
131131
) -> RustFuture<'a, T> {
132132
let fut = Box::into_raw(unsafe { Pin::into_inner_unchecked(fut) });
133133
let poll = RustFuture::<T>::poll;
@@ -137,7 +137,7 @@ pub(crate) mod repr {
137137

138138
#[must_use]
139139
pub fn infallible_future<'a, T: Unpin>(
140-
fut: Pin<Box<dyn Future<Output = T> + Send + 'a>>,
140+
fut: Pin<Box<dyn Future<Output = T> + 'a>>,
141141
) -> RustInfallibleFuture<'a, T> {
142142
let fut = Box::into_raw(unsafe { Pin::into_inner_unchecked(fut) });
143143
let poll = RustInfallibleFuture::<T>::poll;

kj-rs/lib.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ pub type Error = std::io::Error;
2626
#[allow(clippy::needless_lifetimes)]
2727
mod ffi {
2828

29-
/// Representation of a `GuardedRustPromiseAwaiter` in C++. The size of the blob should match.
29+
/// Representation of a `RustPromiseAwaiter` in C++. The size of the blob should match.
3030
#[derive(Debug)]
31-
pub struct GuardedRustPromiseAwaiterRepr {
32-
_bindgen_opaque_blob: [u64; 15usize],
31+
pub struct RustPromiseAwaiterRepr {
32+
_bindgen_opaque_blob: [u64; 14usize],
3333
}
3434

3535
extern "Rust" {
@@ -73,22 +73,22 @@ mod ffi {
7373
unsafe extern "C++" {
7474
include!("kj-rs/awaiter.h");
7575

76-
type GuardedRustPromiseAwaiter;
76+
type RustPromiseAwaiter;
7777

78-
unsafe fn guarded_rust_promise_awaiter_new_in_place(
79-
ptr: *mut GuardedRustPromiseAwaiter,
78+
unsafe fn rust_promise_awaiter_new_in_place(
79+
ptr: *mut RustPromiseAwaiter,
8080
rust_waker_ptr: *mut OptionWaker,
8181
node: OwnPromiseNode,
8282
);
83-
unsafe fn guarded_rust_promise_awaiter_drop_in_place(ptr: *mut GuardedRustPromiseAwaiter);
83+
unsafe fn rust_promise_awaiter_drop_in_place(ptr: *mut RustPromiseAwaiter);
8484

8585
unsafe fn poll(
86-
self: Pin<&mut GuardedRustPromiseAwaiter>,
86+
self: Pin<&mut RustPromiseAwaiter>,
8787
waker: &WakerRef,
8888
maybe_kj_waker: *const KjWaker,
8989
) -> bool;
9090

9191
#[must_use]
92-
fn take_own_promise_node(self: Pin<&mut GuardedRustPromiseAwaiter>) -> OwnPromiseNode;
92+
fn take_own_promise_node(self: Pin<&mut RustPromiseAwaiter>) -> OwnPromiseNode;
9393
}
9494
}

kj-rs/promise.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,6 @@ type CxxResult<T> = std::result::Result<T, cxx::Exception>;
1919
#[repr(transparent)]
2020
pub struct OwnPromiseNode(*mut c_void /* kj::_::PromiseNode* */);
2121

22-
// Safety: KJ Promises are not associated with threads, but with event loops at construction time.
23-
// Therefore, they can be polled from any thread, as long as that thread has the correct event loop
24-
// active at the time of the call to `poll()`. If the correct event loop is not active, the
25-
// OwnPromiseNode's API will typically panic, undefined behavior could be possible. However, Rust
26-
// doesn't have direct access to OwnPromiseNode's API. Instead, it can only use the Promise by
27-
// having GuardedRustPromiseAwaiter consume it, and GuardedRustPromiseAwaiter implements the
28-
// correct-executor guarantee.
29-
unsafe impl Send for OwnPromiseNode {}
30-
3122
// Note: drop is not the only way for OwnPromiseNode to be destroyed.
3223
// It is forgotten using `MaybeUninit` and its ownership passed over to c++ in `unwrap`.
3324
impl Drop for OwnPromiseNode {
@@ -151,5 +142,3 @@ impl<T> KjPromise for CallbacksFuture<T> {
151142
Ok(unsafe { ret.assume_init() })
152143
}
153144
}
154-
155-
unsafe impl<T: Send> Send for CallbacksFuture<T> {}

kj-rs/tests/test_futures.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,7 @@ pub async fn new_layered_ready_future_void() -> Result<()> {
149149
}
150150

151151
// From example at https://doc.rust-lang.org/std/future/fn.poll_fn.html#capturing-a-pinned-state
152-
async fn naive_select<T>(
153-
a: impl Future<Output = T> + Send,
154-
b: impl Future<Output = T> + Send,
155-
) -> T {
152+
async fn naive_select<T>(a: impl Future<Output = T>, b: impl Future<Output = T>) -> T {
156153
let (mut a, mut b) = (pin!(a), pin!(b));
157154
future::poll_fn(move |cx| {
158155
if let Poll::Ready(r) = a.as_mut().poll(cx) {

macro/src/expand.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,9 +1268,9 @@ fn expand_rust_function_shim_super(
12681268
};
12691269

12701270
if fut.throws_tokens.is_some() {
1271-
quote!(-> std::pin::Pin<Box<dyn ::std::future::Future<Output = ::std::result::Result<#output, String>> + Send #lifetimes>>)
1271+
quote!(-> std::pin::Pin<Box<dyn ::std::future::Future<Output = ::std::result::Result<#output, String>> #lifetimes>>)
12721272
} else {
1273-
quote!(-> std::pin::Pin<Box<dyn ::std::future::Future<Output = #output> + Send #lifetimes>>)
1273+
quote!(-> std::pin::Pin<Box<dyn ::std::future::Future<Output = #output> #lifetimes>>)
12741274
}
12751275
} else {
12761276
expand_return_type(&sig.ret)

0 commit comments

Comments
 (0)