Skip to content

Commit 33a3529

Browse files
authored
Revert "[workerd-cxx] remove + Send" (#37)
This reverts commit fc3d2a8.
1 parent fc3d2a8 commit 33a3529

File tree

8 files changed

+89
-42
lines changed

8 files changed

+89
-42
lines changed

kj-rs/awaiter.c++

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ namespace kj_rs {
1313
// that end, we use bindgen to generate an opaque FFI type of known size for RustPromiseAwaiter in
1414
// awaiter.h.rs.
1515
//
16-
static_assert(sizeof(RustPromiseAwaiter) == sizeof(RustPromiseAwaiterRepr),
17-
"RustPromiseAwaiter size changed, you must update lib.rs ffi");
18-
static_assert(alignof(RustPromiseAwaiter) == alignof(RustPromiseAwaiterRepr),
19-
"RustPromiseAwaiter alignment changed, you must update lib.rs ffi");
16+
static_assert(sizeof(GuardedRustPromiseAwaiter) == sizeof(GuardedRustPromiseAwaiterRepr),
17+
"GuardedRustPromiseAwaiter size changed, you must update lib.rs ffi");
18+
static_assert(alignof(GuardedRustPromiseAwaiter) == alignof(GuardedRustPromiseAwaiterRepr),
19+
"GuardedRustPromiseAwaiter alignment changed, you must update lib.rs ffi");
2020

2121
RustPromiseAwaiter::RustPromiseAwaiter(
2222
OptionWaker& optionWaker, OwnPromiseNode nodeParam, kj::SourceLocation location)
@@ -132,11 +132,11 @@ OwnPromiseNode RustPromiseAwaiter::take_own_promise_node() {
132132
return kj::mv(node);
133133
}
134134

135-
void rust_promise_awaiter_new_in_place(
136-
RustPromiseAwaiter* ptr, OptionWaker* optionWaker, OwnPromiseNode node) {
135+
void guarded_rust_promise_awaiter_new_in_place(
136+
GuardedRustPromiseAwaiter* ptr, OptionWaker* optionWaker, OwnPromiseNode node) {
137137
kj::ctor(*ptr, *optionWaker, kj::mv(node));
138138
}
139-
void rust_promise_awaiter_drop_in_place(RustPromiseAwaiter* ptr) {
139+
void guarded_rust_promise_awaiter_drop_in_place(GuardedRustPromiseAwaiter* ptr) {
140140
kj::dtor(*ptr);
141141
}
142142

@@ -204,16 +204,20 @@ void FuturePollEvent::tracePromise(kj::_::TraceBuilder& builder, bool stopAtNext
204204
}
205205
}
206206

207-
FuturePollEvent::PollScope::PollScope(FuturePollEvent& futurePollEvent): event(futurePollEvent) {
207+
FuturePollEvent::PollScope::PollScope(FuturePollEvent& futurePollEvent): holder(futurePollEvent) {
208208
futurePollEvent.enterPollScope();
209209
}
210210

211211
FuturePollEvent::PollScope::~PollScope() noexcept(false) {
212-
event.exitPollScope(reset());
212+
holder.get().futurePollEvent.exitPollScope(reset());
213213
}
214214

215215
kj::Maybe<FuturePollEvent&> FuturePollEvent::PollScope::tryGetFuturePollEvent() const {
216-
return event;
216+
KJ_IF_SOME(h, holder.tryGet()) {
217+
return h.futurePollEvent;
218+
} else {
219+
return kj::none;
220+
}
217221
}
218222

219223
} // namespace kj_rs

kj-rs/awaiter.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,24 @@ class RustPromiseAwaiter final: public kj::_::Event,
106106
OwnPromiseNode node;
107107
};
108108

109-
void rust_promise_awaiter_new_in_place(RustPromiseAwaiter*, OptionWaker*, OwnPromiseNode);
110-
void rust_promise_awaiter_drop_in_place(RustPromiseAwaiter*);
109+
// We force Rust to call our `poll()` overloads using this ExecutorGuarded wrapper around the actual
110+
// RustPromiseAwaiter class. This allows us to assume all calls that reach RustPromiseAwaiter itself
111+
// are on the correct thread.
112+
struct GuardedRustPromiseAwaiter: ExecutorGuarded<RustPromiseAwaiter> {
113+
// We need to inherit constructors or else placement-new will try to aggregate-initialize us.
114+
using ExecutorGuarded<RustPromiseAwaiter>::ExecutorGuarded;
115+
116+
bool poll(const WakerRef& waker, const KjWaker* maybeKjWaker) {
117+
return get().poll(waker, maybeKjWaker);
118+
}
119+
OwnPromiseNode take_own_promise_node() {
120+
return get().take_own_promise_node();
121+
}
122+
};
123+
124+
void guarded_rust_promise_awaiter_new_in_place(
125+
GuardedRustPromiseAwaiter*, OptionWaker*, OwnPromiseNode);
126+
void guarded_rust_promise_awaiter_drop_in_place(GuardedRustPromiseAwaiter*);
111127

112128
// =======================================================================================
113129
// FuturePollEvent
@@ -171,7 +187,10 @@ class FuturePollEvent::PollScope: public LazyArcWaker {
171187
kj::Maybe<FuturePollEvent&> tryGetFuturePollEvent() const override;
172188

173189
private:
174-
FuturePollEvent& event;
190+
struct FuturePollEventHolder {
191+
FuturePollEvent& futurePollEvent;
192+
};
193+
ExecutorGuarded<FuturePollEventHolder> holder;
175194
};
176195

177196
// =======================================================================================

kj-rs/awaiter.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,19 @@ use std::mem::MaybeUninit;
22
use std::pin::Pin;
33
use std::task::Context;
44

5-
use crate::ffi::{RustPromiseAwaiter, RustPromiseAwaiterRepr};
5+
use crate::ffi::{GuardedRustPromiseAwaiter, GuardedRustPromiseAwaiterRepr};
66
use crate::waker::try_into_kj_waker_ptr;
77

8+
// =======================================================================================
9+
// GuardedRustPromiseAwaiter
10+
11+
// Safety: KJ Promises are not associated with threads, but with event loops at construction time.
12+
// Therefore, they can be polled from any thread, as long as that thread has the correct event loop
13+
// active at the time of the call to `poll()`. If the correct event loop is not active,
14+
// GuardedRustPromiseAwaiter's API will panic. (The Guarded- prefix refers to the C++ class template
15+
// ExecutorGuarded, which enforces the correct event loop requirement.)
16+
unsafe impl Send for GuardedRustPromiseAwaiter {}
17+
818
// =======================================================================================
919
// Await syntax for OwnPromiseNode
1020

@@ -13,7 +23,7 @@ use crate::OwnPromiseNode;
1323
pub struct PromiseAwaiter<Data: std::marker::Unpin> {
1424
node: Option<OwnPromiseNode>,
1525
pub(crate) data: Data,
16-
awaiter: MaybeUninit<RustPromiseAwaiterRepr>,
26+
awaiter: MaybeUninit<GuardedRustPromiseAwaiterRepr>,
1727
awaiter_initialized: bool,
1828
// Safety: `option_waker` must be declared after `awaiter`, because `awaiter` contains a reference
1929
// to `option_waker`. This ensures `option_waker` will be dropped after `awaiter`.
@@ -35,19 +45,19 @@ impl<Data: std::marker::Unpin> PromiseAwaiter<Data> {
3545
///
3646
/// Panics if `node` is None.
3747
#[must_use]
38-
pub fn get_awaiter(mut self: Pin<&mut Self>) -> Pin<&mut RustPromiseAwaiter> {
48+
pub fn get_awaiter(mut self: Pin<&mut Self>) -> Pin<&mut GuardedRustPromiseAwaiter> {
3949
// Safety: We never move out of `this`.
4050
let this = unsafe { Pin::into_inner_unchecked(self.as_mut()) };
4151

4252
// Initialize the awaiter if not already done
4353
if !this.awaiter_initialized {
4454
// On our first invocation, `node` will be Some, and `get_awaiter` will forward its
45-
// contents into RustPromiseAwaiter's constructor. On all subsequent invocations, `node`
55+
// contents into GuardedRustPromiseAwaiter's constructor. On all subsequent invocations, `node`
4656
// will be None and the constructor will not run.
4757
let node = this.node.take();
4858

4959
// Safety: `awaiter` stores `rust_waker_ptr` and uses it to call `wake()`. Note that
50-
// `awaiter` is `this.awaiter`, which lives before `this.option_waker`.
60+
// `awaiter` is `this.awaiter`, which lives before `this.option_waker`.
5161
// Since we drop awaiter manually, the `rust_waker_ptr` that `awaiter` stores will always
5262
// be valid during its lifetime.
5363
//
@@ -58,8 +68,8 @@ impl<Data: std::marker::Unpin> PromiseAwaiter<Data> {
5868

5969
// Safety: The memory slot is valid and this type ensures that it will stay pinned.
6070
unsafe {
61-
crate::ffi::rust_promise_awaiter_new_in_place(
62-
this.awaiter.as_mut_ptr().cast::<RustPromiseAwaiter>(),
71+
crate::ffi::guarded_rust_promise_awaiter_new_in_place(
72+
this.awaiter.as_mut_ptr().cast::<GuardedRustPromiseAwaiter>(),
6373
rust_waker_ptr,
6474
node.expect("node should be Some in call to init()"),
6575
);
@@ -69,8 +79,8 @@ impl<Data: std::marker::Unpin> PromiseAwaiter<Data> {
6979

7080
// Safety: `this.awaiter` is pinned since `self` is pinned.
7181
unsafe {
72-
let raw = this.awaiter.assume_init_mut() as *mut RustPromiseAwaiterRepr;
73-
let raw = raw.cast::<RustPromiseAwaiter>();
82+
let raw = this.awaiter.assume_init_mut() as *mut GuardedRustPromiseAwaiterRepr;
83+
let raw = raw.cast::<GuardedRustPromiseAwaiter>();
7484
Pin::new_unchecked(&mut *raw)
7585
}
7686
}
@@ -87,8 +97,8 @@ impl<Data: std::marker::Unpin> Drop for PromiseAwaiter<Data> {
8797
fn drop(&mut self) {
8898
if self.awaiter_initialized {
8999
unsafe {
90-
crate::ffi::rust_promise_awaiter_drop_in_place(
91-
self.awaiter.as_mut_ptr().cast::<RustPromiseAwaiter>(),
100+
crate::ffi::guarded_rust_promise_awaiter_drop_in_place(
101+
self.awaiter.as_mut_ptr().cast::<GuardedRustPromiseAwaiter>()
92102
);
93103
}
94104
}

kj-rs/future.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,19 @@ pub(crate) mod repr {
3838

3939
type DropCallback = unsafe extern "C" fn(fut: *mut c_void);
4040

41-
type FuturePtr<'a, T> = *mut (dyn Future<Output = Result<T, String>> + 'a);
41+
type FuturePtr<'a, T> = *mut (dyn Future<Output = Result<T, String>> + Send + 'a);
4242

43-
/// Represents a `dyn Future<Output = Result<T, String>>`.
43+
/// Represents a `dyn Future<Output = Result<T, String>>` + Send.
4444
#[repr(C)]
4545
pub struct RustFuture<'a, T> {
4646
pub fut: FuturePtr<'a, T>,
4747
pub poll: PollCallback,
4848
pub drop: DropCallback,
4949
}
5050

51-
type InfallibleFuturePtr<'a, T> = *mut (dyn Future<Output = T> + 'a);
51+
type InfallibleFuturePtr<'a, T> = *mut (dyn Future<Output = T> + Send + 'a);
5252

53-
/// Represents a `dyn Future<Output = T>` where T is not a Result.
53+
/// Represents a `dyn Future<Output = T> + Send` where T is not a Result.
5454
#[repr(C)]
5555
pub struct RustInfallibleFuture<'a, T> {
5656
pub fut: InfallibleFuturePtr<'a, T>,
@@ -127,7 +127,7 @@ pub(crate) mod repr {
127127

128128
#[must_use]
129129
pub fn future<'a, T: Unpin>(
130-
fut: Pin<Box<dyn Future<Output = Result<T, String>> + 'a>>,
130+
fut: Pin<Box<dyn Future<Output = Result<T, String>> + Send + 'a>>,
131131
) -> RustFuture<'a, T> {
132132
let fut = Box::into_raw(unsafe { Pin::into_inner_unchecked(fut) });
133133
let poll = RustFuture::<T>::poll;
@@ -137,7 +137,7 @@ pub(crate) mod repr {
137137

138138
#[must_use]
139139
pub fn infallible_future<'a, T: Unpin>(
140-
fut: Pin<Box<dyn Future<Output = T> + 'a>>,
140+
fut: Pin<Box<dyn Future<Output = T> + Send + 'a>>,
141141
) -> RustInfallibleFuture<'a, T> {
142142
let fut = Box::into_raw(unsafe { Pin::into_inner_unchecked(fut) });
143143
let poll = RustInfallibleFuture::<T>::poll;

kj-rs/lib.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ pub type Error = std::io::Error;
2626
#[allow(clippy::needless_lifetimes)]
2727
mod ffi {
2828

29-
/// Representation of a `RustPromiseAwaiter` in C++. The size of the blob should match.
29+
/// Representation of a `GuardedRustPromiseAwaiter` in C++. The size of the blob should match.
3030
#[derive(Debug)]
31-
pub struct RustPromiseAwaiterRepr {
32-
_bindgen_opaque_blob: [u64; 14usize],
31+
pub struct GuardedRustPromiseAwaiterRepr {
32+
_bindgen_opaque_blob: [u64; 15usize],
3333
}
3434

3535
extern "Rust" {
@@ -73,22 +73,22 @@ mod ffi {
7373
unsafe extern "C++" {
7474
include!("kj-rs/awaiter.h");
7575

76-
type RustPromiseAwaiter;
76+
type GuardedRustPromiseAwaiter;
7777

78-
unsafe fn rust_promise_awaiter_new_in_place(
79-
ptr: *mut RustPromiseAwaiter,
78+
unsafe fn guarded_rust_promise_awaiter_new_in_place(
79+
ptr: *mut GuardedRustPromiseAwaiter,
8080
rust_waker_ptr: *mut OptionWaker,
8181
node: OwnPromiseNode,
8282
);
83-
unsafe fn rust_promise_awaiter_drop_in_place(ptr: *mut RustPromiseAwaiter);
83+
unsafe fn guarded_rust_promise_awaiter_drop_in_place(ptr: *mut GuardedRustPromiseAwaiter);
8484

8585
unsafe fn poll(
86-
self: Pin<&mut RustPromiseAwaiter>,
86+
self: Pin<&mut GuardedRustPromiseAwaiter>,
8787
waker: &WakerRef,
8888
maybe_kj_waker: *const KjWaker,
8989
) -> bool;
9090

9191
#[must_use]
92-
fn take_own_promise_node(self: Pin<&mut RustPromiseAwaiter>) -> OwnPromiseNode;
92+
fn take_own_promise_node(self: Pin<&mut GuardedRustPromiseAwaiter>) -> OwnPromiseNode;
9393
}
9494
}

kj-rs/promise.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ type CxxResult<T> = std::result::Result<T, cxx::Exception>;
1919
#[repr(transparent)]
2020
pub struct OwnPromiseNode(*mut c_void /* kj::_::PromiseNode* */);
2121

22+
// Safety: KJ Promises are not associated with threads, but with event loops at construction time.
23+
// Therefore, they can be polled from any thread, as long as that thread has the correct event loop
24+
// active at the time of the call to `poll()`. If the correct event loop is not active, the
25+
// OwnPromiseNode's API will typically panic, undefined behavior could be possible. However, Rust
26+
// doesn't have direct access to OwnPromiseNode's API. Instead, it can only use the Promise by
27+
// having GuardedRustPromiseAwaiter consume it, and GuardedRustPromiseAwaiter implements the
28+
// correct-executor guarantee.
29+
unsafe impl Send for OwnPromiseNode {}
30+
2231
// Note: drop is not the only way for OwnPromiseNode to be destroyed.
2332
// It is forgotten using `MaybeUninit` and its ownership passed over to c++ in `unwrap`.
2433
impl Drop for OwnPromiseNode {
@@ -142,3 +151,5 @@ impl<T> KjPromise for CallbacksFuture<T> {
142151
Ok(unsafe { ret.assume_init() })
143152
}
144153
}
154+
155+
unsafe impl<T: Send> Send for CallbacksFuture<T> {}

kj-rs/tests/test_futures.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,10 @@ pub async fn new_layered_ready_future_void() -> Result<()> {
149149
}
150150

151151
// From example at https://doc.rust-lang.org/std/future/fn.poll_fn.html#capturing-a-pinned-state
152-
async fn naive_select<T>(a: impl Future<Output = T>, b: impl Future<Output = T>) -> T {
152+
async fn naive_select<T>(
153+
a: impl Future<Output = T> + Send,
154+
b: impl Future<Output = T> + Send,
155+
) -> T {
153156
let (mut a, mut b) = (pin!(a), pin!(b));
154157
future::poll_fn(move |cx| {
155158
if let Poll::Ready(r) = a.as_mut().poll(cx) {

macro/src/expand.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,9 +1268,9 @@ fn expand_rust_function_shim_super(
12681268
};
12691269

12701270
if fut.throws_tokens.is_some() {
1271-
quote!(-> std::pin::Pin<Box<dyn ::std::future::Future<Output = ::std::result::Result<#output, String>> #lifetimes>>)
1271+
quote!(-> std::pin::Pin<Box<dyn ::std::future::Future<Output = ::std::result::Result<#output, String>> + Send #lifetimes>>)
12721272
} else {
1273-
quote!(-> std::pin::Pin<Box<dyn ::std::future::Future<Output = #output> #lifetimes>>)
1273+
quote!(-> std::pin::Pin<Box<dyn ::std::future::Future<Output = #output> + Send #lifetimes>>)
12741274
}
12751275
} else {
12761276
expand_return_type(&sig.ret)

0 commit comments

Comments
 (0)