Skip to content

Commit b98f2e8

Browse files
authored
Support "linear writes" in Rust future bindings (#1306)
To account for WebAssembly/wasip3-prototyping#172 and WebAssembly/component-model#521 the Rust bindings need to take into account that a future writer cannot be dropped without actually writing something. To model this futures now must be created with a thunk that creates the default value to be sent. This value is only used if another value was not otherwise written. Closes #1304
1 parent 5141578 commit b98f2e8

File tree

28 files changed

+241
-111
lines changed

28 files changed

+241
-111
lines changed

crates/guest-rust/rt/src/async_support/future_support.rs

Lines changed: 100 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,30 @@
8686
//! represented the same in Rust as it is in the canonical ABI. That means that
8787
//! sending `list<T>` into a future might require copying the entire list and
8888
//! changing its layout. Currently this is par-for-the-course with bindings.
89+
//!
90+
//! ## Linear (exactly once) Writes
91+
//!
92+
//! The component model requires that a writable end of a future must be written
93+
//! to before closing, otherwise the close operation traps. Ideally usage of
94+
//! this API shouldn't result in traps so this is modeled in the Rust-level API
95+
//! to prevent this trap from occurring. Rust does not support linear types
96+
//! (types that must be used exactly once), instead it only has affine types
97+
//! (types which must be used at most once), meaning that this requires some
98+
//! runtime support.
99+
//!
100+
//! Specifically the `FutureWriter` structure stores two auxiliary Rust-specific
101+
//! pieces of information:
102+
//!
103+
//! * A `should_write_default_value` boolean - if `true` on destruction then a
104+
//! value has not yet been written and something must be written.
105+
//! * A `default: fn() -> T` constructor to lazily create the default value to
106+
//! be sent in this situation.
107+
//!
108+
//! This `default` field is provided by the user when the future is initially
109+
//! created. Additionally during `Drop` a new Rust-level task will be spawned to
110+
//! perform the write in the background. That'll keep the component-level task
111+
//! alive until that write completes but otherwise shouldn't hinder anything
112+
//! else.
89113
90114
use {
91115
super::waitable::{WaitableOp, WaitableOperation},
@@ -162,6 +186,7 @@ pub struct FutureVtable<T> {
162186
/// This function is unsafe as it requires the functions within `vtable` to
163187
/// correctly uphold the contracts of the component model.
164188
pub unsafe fn future_new<T>(
189+
default: fn() -> T,
165190
vtable: &'static FutureVtable<T>,
166191
) -> (FutureWriter<T>, FutureReader<T>) {
167192
unsafe {
@@ -170,7 +195,7 @@ pub unsafe fn future_new<T>(
170195
let writer = (handles >> 32) as u32;
171196
rtdebug!("future.new() = [{writer}, {reader}]");
172197
(
173-
FutureWriter::new(writer, vtable),
198+
FutureWriter::new(writer, default, vtable),
174199
FutureReader::new(reader, vtable),
175200
)
176201
}
@@ -183,6 +208,19 @@ pub unsafe fn future_new<T>(
183208
pub struct FutureWriter<T: 'static> {
184209
handle: u32,
185210
vtable: &'static FutureVtable<T>,
211+
212+
/// Whether or not a value should be written during `drop`.
213+
///
214+
/// This is set to `false` when a value is successfully written or when a
215+
/// value is written but the future is witnessed as being closed.
216+
///
217+
/// Note that this is set to `true` on construction to ensure that only
218+
/// location which actually witness a completed write set it to `false`.
219+
should_write_default_value: bool,
220+
221+
/// Constructor for the default value to write during `drop`, should one
222+
/// need to be written.
223+
default: fn() -> T,
186224
}
187225

188226
impl<T> FutureWriter<T> {
@@ -193,8 +231,13 @@ impl<T> FutureWriter<T> {
193231
/// This function is unsafe as it requires the functions within `vtable` to
194232
/// correctly uphold the contracts of the component model.
195233
#[doc(hidden)]
196-
pub unsafe fn new(handle: u32, vtable: &'static FutureVtable<T>) -> Self {
197-
Self { handle, vtable }
234+
pub unsafe fn new(handle: u32, default: fn() -> T, vtable: &'static FutureVtable<T>) -> Self {
235+
Self {
236+
handle,
237+
default,
238+
should_write_default_value: true,
239+
vtable,
240+
}
198241
}
199242

200243
/// Write the specified `value` to this `future`.
@@ -241,9 +284,33 @@ impl<T> fmt::Debug for FutureWriter<T> {
241284

242285
impl<T> Drop for FutureWriter<T> {
243286
fn drop(&mut self) {
244-
unsafe {
245-
rtdebug!("future.close-writable({})", self.handle);
246-
(self.vtable.close_writable)(self.handle);
287+
// If a value has not yet been written into this writer than that must
288+
// be done so now. Perform a "clone" of `self` by moving our data into a
289+
// subtask, but ensure that `should_write_default_value` is set to
290+
// `false` to avoid infinite loops by accident. Once the task is spawned
291+
// we're done and the subtask's destructor of the closed-over
292+
// `FutureWriter` will be responsible for performing the
293+
// `close-writable` call below.
294+
//
295+
// Note, though, that if `should_write_default_value` is `false` then a
296+
// write has already happened and we can go ahead and just synchronously
297+
// drop this writer as we would any other handle.
298+
if self.should_write_default_value {
299+
let clone = FutureWriter {
300+
handle: self.handle,
301+
default: self.default,
302+
should_write_default_value: false,
303+
vtable: self.vtable,
304+
};
305+
crate::async_support::spawn(async move {
306+
let value = (clone.default)();
307+
let _ = clone.write(value).await;
308+
});
309+
} else {
310+
unsafe {
311+
rtdebug!("future.close-writable({})", self.handle);
312+
(self.vtable.close_writable)(self.handle);
313+
}
247314
}
248315
}
249316
}
@@ -299,7 +366,7 @@ where
299366
}
300367

301368
fn in_progress_update(
302-
(writer, cleanup): Self::InProgress,
369+
(mut writer, cleanup): Self::InProgress,
303370
code: u32,
304371
) -> Result<Self::Result, Self::InProgress> {
305372
let ptr = cleanup
@@ -321,6 +388,11 @@ where
321388
// pass here.
322389
let value = unsafe { (writer.vtable.lift)(ptr) };
323390
let status = if c == ReturnCode::Closed(0) {
391+
// This writer has been witnessed to be closed, meaning that
392+
// `writer` is going to get destroyed soon as this return
393+
// value propagates up the stack. There's no need to write
394+
// the default value, so set this to `false`.
395+
writer.should_write_default_value = false;
324396
WriteComplete::Closed(value)
325397
} else {
326398
WriteComplete::Cancelled(value)
@@ -338,6 +410,9 @@ where
338410
// Afterwards the `cleanup` itself is naturally dropped and cleaned
339411
// up.
340412
ReturnCode::Completed(1) | ReturnCode::Closed(1) | ReturnCode::Cancelled(1) => {
413+
// A value was written, so no need to write the default value.
414+
writer.should_write_default_value = false;
415+
341416
// SAFETY: we're the ones managing `ptr` so we know it's safe to
342417
// pass here.
343418
unsafe {
@@ -456,7 +531,7 @@ pub enum FutureWriteCancel<T: 'static> {
456531
Cancelled(T, FutureWriter<T>),
457532
}
458533

459-
/// Represents the readable end of a Component Model `future`.
534+
/// Represents the readable end of a Component Model `future<T>`.
460535
pub struct FutureReader<T: 'static> {
461536
handle: AtomicU32,
462537
vtable: &'static FutureVtable<T>,
@@ -499,12 +574,11 @@ impl<T> FutureReader<T> {
499574
}
500575

501576
impl<T> IntoFuture for FutureReader<T> {
502-
type Output = Option<T>;
577+
type Output = T;
503578
type IntoFuture = FutureRead<T>;
504579

505580
/// Convert this object into a `Future` which will resolve when a value is
506-
/// written to the writable end of this `future` (yielding a `Some` result)
507-
/// or when the writable end is dropped (yielding a `None` result).
581+
/// written to the writable end of this `future`.
508582
fn into_future(self) -> Self::IntoFuture {
509583
FutureRead {
510584
op: WaitableOperation::new(self),
@@ -536,7 +610,6 @@ struct FutureReadOp<T>(marker::PhantomData<T>);
536610

537611
enum ReadComplete<T> {
538612
Value(T),
539-
Closed,
540613
Cancelled,
541614
}
542615

@@ -547,7 +620,7 @@ where
547620
type Start = FutureReader<T>;
548621
type InProgress = (FutureReader<T>, Option<Cleanup>);
549622
type Result = (ReadComplete<T>, FutureReader<T>);
550-
type Cancel = Result<Option<T>, FutureReader<T>>;
623+
type Cancel = Result<T, FutureReader<T>>;
551624

552625
fn start(reader: Self::Start) -> (u32, Self::InProgress) {
553626
let (ptr, cleanup) = Cleanup::new(reader.vtable.layout);
@@ -570,13 +643,9 @@ where
570643
match ReturnCode::decode(code) {
571644
ReturnCode::Blocked => Err((reader, cleanup)),
572645

573-
// The read didn't complete, so `cleanup` is still uninitialized, so
574-
// let it fall out of scope.
575-
ReturnCode::Closed(0) => Ok((ReadComplete::Closed, reader)),
576-
577-
// Like `in_progress_closed` the read operation has finished but
578-
// without a value, so let `cleanup` fall out of scope to clean up
579-
// its allocation.
646+
// Let `cleanup` fall out of scope to clean up its allocation here,
647+
// and otherwise tahe reader is plumbed through to possibly restart
648+
// the read in the future.
580649
ReturnCode::Cancelled(0) => Ok((ReadComplete::Cancelled, reader)),
581650

582651
// The read has completed, so lift the value from the stored memory and
@@ -613,28 +682,27 @@ where
613682
fn result_into_cancel((value, reader): Self::Result) -> Self::Cancel {
614683
match value {
615684
// The value was actually read, so thread that through here.
616-
ReadComplete::Value(value) => Ok(Some(value)),
685+
ReadComplete::Value(value) => Ok(value),
617686

618687
// The read was successfully cancelled, so thread through the
619688
// `reader` to possibly restart later on.
620689
ReadComplete::Cancelled => Err(reader),
621-
622-
// The other end was closed, so this can't possibly ever complete
623-
// again, so thread that through.
624-
ReadComplete::Closed => Ok(None),
625690
}
626691
}
627692
}
628693

629694
impl<T: 'static> Future for FutureRead<T> {
630-
type Output = Option<T>;
695+
type Output = T;
631696

632697
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
633698
self.pin_project()
634699
.poll_complete(cx)
635700
.map(|(result, _reader)| match result {
636-
ReadComplete::Value(val) => Some(val),
637-
ReadComplete::Cancelled | ReadComplete::Closed => None,
701+
ReadComplete::Value(val) => val,
702+
// This is only possible if, after calling `FutureRead::cancel`,
703+
// the future is polled again. The `cancel` method is documented
704+
// as "don't do that" so this is left to panic.
705+
ReadComplete::Cancelled => panic!("cannot poll after cancelling"),
638706
})
639707
}
640708
}
@@ -650,21 +718,17 @@ impl<T> FutureRead<T> {
650718
///
651719
/// Return values include:
652720
///
653-
/// * `Ok(Some(value))` - future completed before this cancellation request
721+
/// * `Ok(value)` - future completed before this cancellation request
654722
/// was received.
655-
/// * `Ok(None)` - future closed before this cancellation request was
656-
/// received.
657723
/// * `Err(reader)` - read operation was cancelled and it can be retried in
658724
/// the future if desired.
659725
///
660-
/// Note that if this method is called after the write was already cancelled
661-
/// then `Ok(None)` will be returned.
662-
///
663726
/// # Panics
664727
///
665728
/// Panics if the operation has already been completed via `Future::poll`,
666-
/// or if this method is called twice.
667-
pub fn cancel(self: Pin<&mut Self>) -> Result<Option<T>, FutureReader<T>> {
729+
/// or if this method is called twice. Additionally if this method completes
730+
/// then calling `poll` again on `self` will panic.
731+
pub fn cancel(self: Pin<&mut Self>) -> Result<T, FutureReader<T>> {
668732
self.pin_project().cancel()
669733
}
670734
}

crates/rust/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,8 +452,11 @@ pub mod wit_future {{
452452
self.src.push_str(&format!(
453453
"\
454454
/// Creates a new Component Model `future` with the specified payload type.
455-
pub fn new<T: FuturePayload>() -> ({async_support}::FutureWriter<T>, {async_support}::FutureReader<T>) {{
456-
unsafe {{ {async_support}::future_new::<T>(T::VTABLE) }}
455+
///
456+
/// The `default` function provided computes the default value to be sent in
457+
/// this future if no other value was otherwise sent.
458+
pub fn new<T: FuturePayload>(default: fn() -> T) -> ({async_support}::FutureWriter<T>, {async_support}::FutureReader<T>) {{
459+
unsafe {{ {async_support}::future_new::<T>(default, T::VTABLE) }}
457460
}}
458461
}}
459462
",

tests/runtime-async/async/cancel-import/runner.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use wit_bindgen::yield_async;
99
fn main() {
1010
println!("test cancelling an import in progress");
1111
wit_bindgen::block_on(async {
12-
let (tx, rx) = wit_future::new();
12+
let (tx, rx) = wit_future::new(|| unreachable!());
1313
let mut import = Box::pin(pending_import(rx));
1414
assert!(import
1515
.as_mut()
@@ -21,16 +21,16 @@ fn main() {
2121

2222
println!("test cancelling an import before it starts");
2323
wit_bindgen::block_on(async {
24-
let (tx, rx) = wit_future::new();
24+
let (tx, rx) = wit_future::new(|| unreachable!());
2525
let import = Box::pin(pending_import(rx));
2626
drop(import);
2727
tx.write(()).await.unwrap_err();
2828
});
2929

3030
println!("test cancelling an import in the started state");
3131
wit_bindgen::block_on(async {
32-
let (tx1, rx1) = wit_future::new();
33-
let (tx2, rx2) = wit_future::new();
32+
let (tx1, rx1) = wit_future::new(|| unreachable!());
33+
let (tx2, rx2) = wit_future::new(|| unreachable!());
3434

3535
// create a task in the "started" state, but don't complete it yet
3636
let mut started_import = Box::pin(pending_import(rx1));
@@ -66,7 +66,7 @@ fn main() {
6666
println!("test cancellation with a status code saying it's done");
6767
wit_bindgen::block_on(async {
6868
// Start a subtask and get it into the "started" state
69-
let (tx, rx) = wit_future::new();
69+
let (tx, rx) = wit_future::new(|| unreachable!());
7070
let mut import = Box::pin(pending_import(rx));
7171
assert!(import
7272
.as_mut()
@@ -92,7 +92,7 @@ fn main() {
9292
println!("race cancellation with pending status code");
9393
wit_bindgen::block_on(async {
9494
// Start a subtask and get it into the "started" state
95-
let (tx1, rx1) = wit_future::new();
95+
let (tx1, rx1) = wit_future::new(|| unreachable!());
9696
let mut started_import = Box::pin(pending_import(rx1));
9797
assert!(started_import
9898
.as_mut()
@@ -102,7 +102,7 @@ fn main() {
102102
// force the next subtask to start out in the "starting" state, not the
103103
// "started" state.
104104
backpressure_set(true);
105-
let (tx2, rx2) = wit_future::new();
105+
let (tx2, rx2) = wit_future::new(|| unreachable!());
106106
let mut starting_import = Box::pin(pending_import(rx2));
107107
assert!(starting_import
108108
.as_mut()

tests/runtime-async/async/cancel-import/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export!(Component);
88

99
impl crate::exports::my::test::i::Guest for Component {
1010
async fn pending_import(x: FutureReader<()>) {
11-
x.await.unwrap();
11+
x.await
1212
}
1313

1414
fn backpressure_set(x: bool) {

0 commit comments

Comments
 (0)