Skip to content

Commit 0853cc5

Browse files
feat: support error context in stream/error operations (#1149)
* feat: support error context in stream/error operations Signed-off-by: Victor Adossi <vadossi@cosmonic.com> * fix: ensure late drop Signed-off-by: Victor Adossi <vadossi@cosmonic.com> --------- Signed-off-by: Victor Adossi <vadossi@cosmonic.com>
1 parent cd5e771 commit 0853cc5

File tree

4 files changed

+193
-75
lines changed

4 files changed

+193
-75
lines changed

crates/guest-rust/rt/src/async_support.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ pub enum Handle {
5353
LocalClosed,
5454
Read,
5555
Write,
56+
// Local end is closed with an error
57+
// NOTE: this is only valid for write ends
58+
WriteClosedErr(Option<ErrorContext>),
5659
}
5760

5861
/// The current task being polled (or null if none).
@@ -176,7 +179,7 @@ pub async unsafe fn await_result(
176179
STATUS_RETURNED | STATUS_DONE => {
177180
alloc::dealloc(params, params_layout);
178181
}
179-
_ => unreachable!(),
182+
_ => unreachable!("unrecognized async call status"),
180183
}
181184
}
182185

@@ -187,26 +190,50 @@ mod results {
187190
pub const CANCELED: u32 = 0;
188191
}
189192

193+
/// Result of awaiting a asynchronous read or write
194+
#[doc(hidden)]
195+
pub enum AsyncWaitResult {
196+
/// Used when a value was successfully sent or received
197+
Values(usize),
198+
/// Represents a successful but error-indicating read
199+
Error(u32),
200+
/// Represents a failed read (closed, canceled, etc)
201+
End,
202+
}
203+
204+
impl AsyncWaitResult {
205+
/// Interpret the results from an async operation that is known to *not* be blocked
206+
fn from_nonblocked_async_result(v: u32) -> Self {
207+
match v {
208+
results::CLOSED | results::CANCELED => Self::End,
209+
v => {
210+
if v & results::CLOSED != 0 {
211+
Self::Error(v & !results::CLOSED)
212+
} else {
213+
Self::Values(v as usize)
214+
}
215+
}
216+
}
217+
}
218+
}
219+
190220
/// Await the completion of a future read or write.
191221
#[doc(hidden)]
192222
pub async unsafe fn await_future_result(
193223
import: unsafe extern "C" fn(u32, *mut u8) -> u32,
194224
future: u32,
195225
address: *mut u8,
196-
) -> bool {
226+
) -> AsyncWaitResult {
197227
let result = import(future, address);
198228
match result {
199229
results::BLOCKED => {
200230
assert!(!CURRENT.is_null());
201231
(*CURRENT).todo += 1;
202232
let (tx, rx) = oneshot::channel();
203233
CALLS.insert(future as _, tx);
204-
let v = rx.await.unwrap();
205-
v == 1
234+
AsyncWaitResult::from_nonblocked_async_result(rx.await.unwrap())
206235
}
207-
results::CLOSED | results::CANCELED => false,
208-
1 => true,
209-
_ => unreachable!(),
236+
v => AsyncWaitResult::from_nonblocked_async_result(v),
210237
}
211238
}
212239

@@ -217,7 +244,7 @@ pub async unsafe fn await_stream_result(
217244
stream: u32,
218245
address: *mut u8,
219246
count: u32,
220-
) -> Option<usize> {
247+
) -> AsyncWaitResult {
221248
let result = import(stream, address, count);
222249
match result {
223250
results::BLOCKED => {
@@ -227,13 +254,12 @@ pub async unsafe fn await_stream_result(
227254
CALLS.insert(stream as _, tx);
228255
let v = rx.await.unwrap();
229256
if let results::CLOSED | results::CANCELED = v {
230-
None
257+
AsyncWaitResult::End
231258
} else {
232-
Some(usize::try_from(v).unwrap())
259+
AsyncWaitResult::Values(usize::try_from(v).unwrap())
233260
}
234261
}
235-
results::CLOSED | results::CANCELED => None,
236-
v => Some(usize::try_from(v).unwrap()),
262+
v => AsyncWaitResult::from_nonblocked_async_result(v),
237263
}
238264
}
239265

@@ -310,6 +336,7 @@ pub unsafe fn callback(ctx: *mut u8, event0: i32, event1: i32, event2: i32) -> i
310336
}
311337

312338
/// Represents the Component Model `error-context` type.
339+
#[derive(PartialEq, Eq)]
313340
pub struct ErrorContext {
314341
handle: u32,
315342
}

crates/guest-rust/rt/src/async_support/future_support.rs

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
extern crate std;
22

33
use {
4+
super::ErrorContext,
45
super::Handle,
56
futures::{
67
channel::oneshot,
@@ -20,10 +21,10 @@ use {
2021
#[doc(hidden)]
2122
pub struct FutureVtable<T> {
2223
pub write: fn(future: u32, value: T) -> Pin<Box<dyn Future<Output = bool>>>,
23-
pub read: fn(future: u32) -> Pin<Box<dyn Future<Output = Option<T>>>>,
24+
pub read: fn(future: u32) -> Pin<Box<dyn Future<Output = Option<Result<T, ErrorContext>>>>>,
2425
pub cancel_write: fn(future: u32),
2526
pub cancel_read: fn(future: u32),
26-
pub close_writable: fn(future: u32),
27+
pub close_writable: fn(future: u32, err_ctx: u32),
2728
pub close_readable: fn(future: u32),
2829
}
2930

@@ -78,7 +79,8 @@ impl<T> CancelableWrite<T> {
7879
Handle::LocalOpen
7980
| Handle::LocalWaiting(_)
8081
| Handle::Read
81-
| Handle::LocalClosed => unreachable!(),
82+
| Handle::LocalClosed
83+
| Handle::WriteClosedErr(_) => unreachable!(),
8284
Handle::LocalReady(..) => {
8385
entry.insert(Handle::LocalOpen);
8486
}
@@ -126,7 +128,9 @@ impl<T> FutureWriter<T> {
126128
Poll::Pending
127129
}
128130
Handle::LocalReady(..) => Poll::Pending,
129-
Handle::LocalClosed => Poll::Ready(()),
131+
Handle::LocalClosed | Handle::WriteClosedErr(_) => {
132+
Poll::Ready(())
133+
}
130134
Handle::LocalWaiting(_) | Handle::Read | Handle::Write => {
131135
unreachable!()
132136
}
@@ -141,13 +145,29 @@ impl<T> FutureWriter<T> {
141145
_ = tx.send(Box::new(v));
142146
Box::pin(future::ready(()))
143147
}
144-
Handle::LocalClosed => Box::pin(future::ready(())),
148+
Handle::LocalClosed | Handle::WriteClosedErr(_) => Box::pin(future::ready(())),
145149
Handle::Read | Handle::LocalReady(..) => unreachable!(),
146150
Handle::Write => Box::pin((vtable.write)(handle, v).map(drop)),
147151
},
148152
}),
149153
}
150154
}
155+
156+
/// Close the writer with an error that will be returned as the last value
157+
///
158+
/// Note that this error is not sent immediately, but only when the
159+
/// writer closes, which is normally a result of a `drop()`
160+
pub fn close_with_error(&mut self, err: ErrorContext) {
161+
super::with_entry(self.handle, move |entry| match entry {
162+
Entry::Vacant(_) => unreachable!(),
163+
Entry::Occupied(mut entry) => match entry.get_mut() {
164+
// Regardless of current state, put the writer into a closed with error state
165+
_ => {
166+
entry.insert(Handle::WriteClosedErr(Some(err)));
167+
}
168+
},
169+
});
170+
}
151171
}
152172

153173
impl<T> Drop for FutureWriter<T> {
@@ -161,8 +181,17 @@ impl<T> Drop for FutureWriter<T> {
161181
Handle::Read => unreachable!(),
162182
Handle::Write | Handle::LocalClosed => {
163183
entry.remove();
164-
(self.vtable.close_writable)(self.handle);
184+
(self.vtable.close_writable)(self.handle, 0);
165185
}
186+
Handle::WriteClosedErr(_) => match entry.remove() {
187+
Handle::WriteClosedErr(None) => {
188+
(self.vtable.close_writable)(self.handle, 0);
189+
}
190+
Handle::WriteClosedErr(Some(err_ctx)) => {
191+
(self.vtable.close_writable)(self.handle, err_ctx.handle());
192+
}
193+
_ => unreachable!(),
194+
},
166195
},
167196
});
168197
}
@@ -171,13 +200,13 @@ impl<T> Drop for FutureWriter<T> {
171200
/// Represents a read operation which may be canceled prior to completion.
172201
pub struct CancelableRead<T: 'static> {
173202
reader: Option<FutureReader<T>>,
174-
future: Pin<Box<dyn Future<Output = Option<T>>>>,
203+
future: Pin<Box<dyn Future<Output = Option<Result<T, ErrorContext>>>>>,
175204
}
176205

177206
impl<T> Future for CancelableRead<T> {
178-
type Output = Option<T>;
207+
type Output = Option<Result<T, ErrorContext>>;
179208

180-
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<T>> {
209+
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T, ErrorContext>>> {
181210
let me = self.get_mut();
182211
match me.future.poll_unpin(cx) {
183212
Poll::Ready(v) => {
@@ -206,7 +235,8 @@ impl<T> CancelableRead<T> {
206235
Handle::LocalOpen
207236
| Handle::LocalReady(..)
208237
| Handle::Write
209-
| Handle::LocalClosed => unreachable!(),
238+
| Handle::LocalClosed
239+
| Handle::WriteClosedErr(_) => unreachable!(),
210240
Handle::LocalWaiting(_) => {
211241
entry.insert(Handle::LocalOpen);
212242
}
@@ -262,7 +292,8 @@ impl<T> FutureReader<T> {
262292
| Handle::LocalOpen
263293
| Handle::LocalReady(..)
264294
| Handle::LocalWaiting(_)
265-
| Handle::LocalClosed => {
295+
| Handle::LocalClosed
296+
| Handle::WriteClosedErr(_) => {
266297
unreachable!()
267298
}
268299
},
@@ -286,7 +317,10 @@ impl<T> FutureReader<T> {
286317
Handle::Read | Handle::LocalClosed => {
287318
entry.remove();
288319
}
289-
Handle::LocalReady(..) | Handle::LocalWaiting(_) | Handle::Write => unreachable!(),
320+
Handle::LocalReady(..)
321+
| Handle::LocalWaiting(_)
322+
| Handle::Write
323+
| Handle::WriteClosedErr(_) => unreachable!(),
290324
},
291325
});
292326

@@ -295,7 +329,7 @@ impl<T> FutureReader<T> {
295329
}
296330

297331
impl<T> IntoFuture for FutureReader<T> {
298-
type Output = Option<T>;
332+
type Output = Option<Result<T, ErrorContext>>;
299333
type IntoFuture = CancelableRead<T>;
300334

301335
/// Convert this object into a `Future` which will resolve when a value is
@@ -308,8 +342,10 @@ impl<T> IntoFuture for FutureReader<T> {
308342
reader: Some(self),
309343
future: super::with_entry(handle, |entry| match entry {
310344
Entry::Vacant(_) => unreachable!(),
311-
Entry::Occupied(mut entry) => match entry.get() {
312-
Handle::Write | Handle::LocalWaiting(_) => unreachable!(),
345+
Entry::Occupied(mut entry) => match entry.get_mut() {
346+
Handle::Write | Handle::LocalWaiting(_) => {
347+
unreachable!()
348+
}
313349
Handle::Read => Box::pin(async move { (vtable.read)(handle).await })
314350
as Pin<Box<dyn Future<Output = _>>>,
315351
Handle::LocalOpen => {
@@ -318,6 +354,10 @@ impl<T> IntoFuture for FutureReader<T> {
318354
Box::pin(async move { rx.await.ok().map(|v| *v.downcast().unwrap()) })
319355
}
320356
Handle::LocalClosed => Box::pin(future::ready(None)),
357+
Handle::WriteClosedErr(err_ctx) => match err_ctx.take() {
358+
None => Box::pin(future::ready(None)),
359+
Some(err_ctx) => Box::pin(future::ready(Some(Err(err_ctx)))),
360+
},
321361
Handle::LocalReady(..) => {
322362
let Handle::LocalReady(v, waker) = entry.insert(Handle::LocalClosed) else {
323363
unreachable!()
@@ -353,7 +393,7 @@ impl<T> Drop for FutureReader<T> {
353393
entry.remove();
354394
(self.vtable.close_readable)(handle);
355395
}
356-
Handle::Write => unreachable!(),
396+
Handle::Write | Handle::WriteClosedErr(_) => unreachable!(),
357397
},
358398
});
359399
}

0 commit comments

Comments
 (0)