Skip to content

Commit 67d9add

Browse files
committed
CABI: refactor/simplify stream code (no change)
1 parent b6be91e commit 67d9add

File tree

3 files changed

+147
-142
lines changed

3 files changed

+147
-142
lines changed

design/mvp/CanonicalABI.md

Lines changed: 93 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1287,12 +1287,12 @@ calling core wasm code receives the `STREAM_READ` progress event (in which case
12871287
`RevokeBuffer` is called). This reduces the number of task-switches required
12881288
by the spec, particularly when streaming between two components.
12891289

1290-
The `ReadableStreamGuestImpl` class implements `ReadableStream` for streams
1291-
created by wasm (via `stream.new`) and tracks the common state shared by both
1292-
the readable and writable ends of streams (defined below). Introducing the
1293-
class in chunks, starting with the fields and initialization:
1290+
The `SharedStreamImpl` class implements `ReadableStream` for streams created by
1291+
wasm (via `stream.new`) and tracks the common state shared by both the readable
1292+
and writable ends of streams (defined below). Introducing the class in chunks,
1293+
starting with the fields and initialization:
12941294
```python
1295-
class ReadableStreamGuestImpl(ReadableStream):
1295+
class SharedStreamImpl(ReadableStream):
12961296
closed_: bool
12971297
pending_inst: Optional[ComponentInstance]
12981298
pending_buffer: Optional[Buffer]
@@ -1420,30 +1420,30 @@ entirely symmetric, with the only difference being whether the polymorphic
14201420
whether there is an asynchronous read or write in progress and is maintained by
14211421
the definitions of `stream.{read,write}` below. Importantly, `copying` and the
14221422
inherited fields of `Waitable` are per-*end*, not per-*stream* (unlike the
1423-
fields of `ReadableStreamGuestImpl` shown above, which are per-stream and
1424-
shared by both ends via their common `stream` field).
1423+
fields of `SharedStreamImpl` shown above, which are per-stream and shared by
1424+
both ends via their `shared` field).
14251425
```python
14261426
class StreamEnd(Waitable):
1427-
stream: ReadableStream
1427+
shared: ReadableStream
14281428
copying: bool
14291429

1430-
def __init__(self, stream):
1430+
def __init__(self, shared):
14311431
Waitable.__init__(self)
1432-
self.stream = stream
1432+
self.shared = shared
14331433
self.copying = False
14341434

14351435
def drop(self):
14361436
trap_if(self.copying)
1437-
self.stream.close()
1437+
self.shared.close()
14381438
Waitable.drop(self)
14391439

14401440
class ReadableStreamEnd(StreamEnd):
14411441
def copy(self, inst, dst, on_partial_copy, on_copy_done):
1442-
return self.stream.read(inst, dst, on_partial_copy, on_copy_done)
1442+
return self.shared.read(inst, dst, on_partial_copy, on_copy_done)
14431443

14441444
class WritableStreamEnd(StreamEnd):
14451445
def copy(self, inst, src, on_partial_copy, on_copy_done):
1446-
return self.stream.write(inst, src, on_partial_copy, on_copy_done)
1446+
return self.shared.write(inst, src, on_partial_copy, on_copy_done)
14471447
```
14481448
Dropping a stream end while an asynchronous read or write is in progress traps
14491449
since the async read or write cannot be cancelled without blocking and `drop`
@@ -1453,8 +1453,8 @@ finish before closing.
14531453

14541454
The `{Readable,Writable}StreamEnd.copy` method is called polymorphically by the
14551455
shared definition of `stream.{read,write}` below. While the static type of
1456-
`StreamEnd.stream` is `ReadableStream`, a `WritableStreamEnd` always points to
1457-
a `ReadableStreamGuestImpl` object which is why `WritableStreamEnd.copy` can
1456+
`StreamEnd.shared` is `ReadableStream`, a `WritableStreamEnd` always points to
1457+
a `SharedStreamImpl` object which is why `WritableStreamEnd.copy` can
14581458
unconditionally call `stream.write`.
14591459

14601460

@@ -1470,20 +1470,20 @@ class FutureEnd(StreamEnd):
14701470
assert(buffer.remain() == 1)
14711471
def on_copy_done_wrapper(why):
14721472
if buffer.remain() == 0:
1473-
self.stream.close()
1473+
self.shared.close()
14741474
on_copy_done(why)
14751475
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
14761476
if ret == 'done' and buffer.remain() == 0:
1477-
self.stream.close()
1477+
self.shared.close()
14781478
return ret
14791479

14801480
class ReadableFutureEnd(FutureEnd):
14811481
def copy(self, inst, dst, on_partial_copy, on_copy_done):
1482-
return self.close_after_copy(self.stream.read, inst, dst, on_copy_done)
1482+
return self.close_after_copy(self.shared.read, inst, dst, on_copy_done)
14831483

14841484
class WritableFutureEnd(FutureEnd):
14851485
def copy(self, inst, src, on_partial_copy, on_copy_done):
1486-
return self.close_after_copy(self.stream.write, inst, src, on_copy_done)
1486+
return self.close_after_copy(self.shared.write, inst, src, on_copy_done)
14871487
def drop(self):
14881488
FutureEnd.drop(self)
14891489
```
@@ -1971,9 +1971,9 @@ def lift_async_value(ReadableEndT, cx, i, t):
19711971
assert(not contains_borrow(t))
19721972
e = cx.inst.table.remove(i)
19731973
trap_if(not isinstance(e, ReadableEndT))
1974-
trap_if(e.stream.t != t)
1974+
trap_if(e.shared.t != t)
19751975
trap_if(e.copying)
1976-
return e.stream
1976+
return e.shared
19771977
```
19781978
Lifting transfers ownership of the readable end and traps if a read was in
19791979
progress (which would now be dangling).
@@ -3689,20 +3689,20 @@ readable end is subsequently transferred to another component (or the host) via
36893689
```python
36903690
async def canon_stream_new(stream_t, task):
36913691
trap_if(not task.inst.may_leave)
3692-
stream = ReadableStreamGuestImpl(stream_t.t)
3693-
ri = task.inst.table.add(ReadableStreamEnd(stream))
3694-
wi = task.inst.table.add(WritableStreamEnd(stream))
3692+
shared = SharedStreamImpl(stream_t.t)
3693+
ri = task.inst.table.add(ReadableStreamEnd(shared))
3694+
wi = task.inst.table.add(WritableStreamEnd(shared))
36953695
return [ ri | (wi << 32) ]
36963696

36973697
async def canon_future_new(future_t, task):
36983698
trap_if(not task.inst.may_leave)
3699-
future = ReadableStreamGuestImpl(future_t.t)
3700-
ri = task.inst.table.add(ReadableFutureEnd(future))
3701-
wi = task.inst.table.add(WritableFutureEnd(future))
3699+
shared = SharedStreamImpl(future_t.t)
3700+
ri = task.inst.table.add(ReadableFutureEnd(shared))
3701+
wi = task.inst.table.add(WritableFutureEnd(shared))
37023702
return [ ri | (wi << 32) ]
37033703
```
37043704
Because futures are just streams with extra limitations, here we see that a
3705-
`WritableFutureEnd` shares the same `ReadableStreamGuestImpl` type as
3705+
`WritableFutureEnd` shares the same `SharedStreamImpl` type as
37063706
`WritableStreamEnd`; the extra limitations are added by `WritableFutureEnd` and
37073707
the future built-ins below.
37083708

@@ -3756,80 +3756,87 @@ async def canon_future_write(future_t, opts, task, i, ptr):
37563756
future_t, opts, task, i, ptr, 1)
37573757
```
37583758

3759-
Introducing the `copy` function in chunks, `copy` first checks that the element
3760-
at index `i` is of the right type and that there is not already a copy in
3761-
progress. (In the future, this restriction could be relaxed, allowing a finite
3762-
number of pipelined reads or writes.) Then a readable or writable buffer is
3763-
created which (in `Buffer`'s constructor) eagerly checks the alignment and
3764-
bounds of (`i`, `n`).
3759+
Introducing the `copy` function in chunks, `copy` first checks that the
3760+
element at index `i` is of the right type and that there is not already a
3761+
copy in progress. (In the future, this restriction could be relaxed, allowing
3762+
a finite number of pipelined reads or writes.)
37653763
```python
37663764
async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr, n):
37673765
trap_if(not task.inst.may_leave)
37683766
e = task.inst.table.get(i)
37693767
trap_if(not isinstance(e, EndT))
3770-
trap_if(e.stream.t != stream_or_future_t.t)
3768+
trap_if(e.shared.t != stream_or_future_t.t)
37713769
trap_if(e.copying)
3770+
```
3771+
Then a readable or writable buffer is created which (in `Buffer`'s
3772+
constructor) eagerly checks the alignment and bounds of (`i`, `n`).
3773+
(In the future, the restriction on futures/streams containing `borrow`s could
3774+
be relaxed by maintaining sufficient bookkeeping state to ensure that
3775+
borrowed handles *or streams/futures of borrowed handles* could not outlive
3776+
their originating call.)
3777+
```python
37723778
assert(not contains_borrow(stream_or_future_t))
37733779
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
37743780
buffer = BufferT(stream_or_future_t.t, cx, ptr, n)
37753781
```
3776-
3777-
Next, in the synchronous case, `Task.wait_on` is used to synchronously and
3778-
uninterruptibly wait for the `on_*` callbacks to indicate that the copy has made
3779-
progress. In the case of `on_partial_copy`, this code carefully delays the call
3780-
to `revoke_buffer` until right before control flow is returned back to the
3781-
calling core wasm code. This enables another task to potentially complete
3782-
multiple partial copies before having to context-switch back.
3782+
Next, the `copy` method of `{Readable,Writable}{Stream,Future}End` is called
3783+
to attempt to perform the actual `read` or `write`. The `on_partial_copy`
3784+
callback passed to `copy` is called zero or more times each time values are
3785+
copied to/from `buffer` without filling it up. Aftewards, the `on_copy_done`
3786+
callback passed to `copy` is called at most once when: the `buffer` if full,
3787+
the other end closed, or this end cancelled the copy via
3788+
`{stream,future}.cancel-{read,write}`.
37833789
```python
3784-
if opts.sync:
3785-
final_revoke_buffer = None
3786-
def on_partial_copy(revoke_buffer, why = 'completed'):
3787-
assert(why == 'completed')
3788-
nonlocal final_revoke_buffer
3789-
final_revoke_buffer = revoke_buffer
3790-
if not async_copy.done():
3791-
async_copy.set_result(None)
3792-
on_copy_done = partial(on_partial_copy, lambda:())
3793-
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) != 'done':
3794-
async_copy = asyncio.Future()
3795-
await task.wait_on(async_copy, sync = True)
3796-
final_revoke_buffer()
3797-
```
3798-
(When non-cooperative threads are added, the assertion that synchronous copies
3799-
can only be `completed`, and not `cancelled`, will no longer hold.)
3800-
3801-
In the asynchronous case, the `on_*` callbacks set a pending event on the
3802-
`Waitable` which will be delivered to core wasm when core wasm calls
3803-
`task.{wait,poll}` or, if using `callback`, returns to the event loop.
3804-
Symmetric to the synchronous case, this code carefully delays calling
3805-
`revoke_buffer` until the copy event is actually delivered to core wasm,
3806-
allowing multiple partial copies to complete in the interim, reducing overall
3807-
context-switching overhead.
3790+
def copy_event(why, revoke_buffer):
3791+
revoke_buffer()
3792+
e.copying = False
3793+
return (event_code, i, pack_copy_result(task, e, buffer, why))
3794+
3795+
def on_partial_copy(revoke_buffer):
3796+
e.set_event(partial(copy_event, 'completed', revoke_buffer))
3797+
3798+
def on_copy_done(why):
3799+
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
3800+
3801+
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) == 'done':
3802+
return [pack_copy_result(task, e, buffer, 'completed')]
3803+
```
3804+
If the stream/future is already closed or at least 1 element could be
3805+
immediately copied, `copy` returns `'done'` and `{stream,future}.{read,write}`
3806+
synchronously returns how much was copied and how the operation ended to the
3807+
caller. Otherwise, the built-in blocks:
38083808
```python
38093809
else:
3810-
def copy_event(why, revoke_buffer):
3811-
revoke_buffer()
3812-
e.copying = False
3813-
return (event_code, i, pack_copy_result(task, e, buffer, why))
3814-
def on_partial_copy(revoke_buffer):
3815-
e.set_event(partial(copy_event, 'completed', revoke_buffer))
3816-
def on_copy_done(why):
3817-
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
3818-
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) != 'done':
3810+
if opts.sync:
3811+
await task.wait_on(e.wait_for_pending_event(), sync = True)
3812+
code,index,payload = e.get_event()
3813+
assert(code == event_code and index == i)
3814+
return [payload]
3815+
else:
38193816
e.copying = True
38203817
return [BLOCKED]
3821-
return [pack_copy_result(task, e, buffer, 'completed')]
38223818
```
3823-
However the copy completes, the results are reported to the caller via
3824-
`pack_copy_result`:
3819+
In the synchronous case, the caller synchronously waits for progress
3820+
(blocking all execution in the calling component instance, but allowing other
3821+
tasks in other component instances to make progress). Note that `get_event()`
3822+
necessarily calls a `copy_event` closure created by either `on_partial_copy`
3823+
or `on_copy_done`. In the asynchronous case, the built-in immeditely returns
3824+
the `BLOCKED` code and the caller must asynchronously wait for progress using
3825+
`waitable-set.{wait,poll}` or, if using a `callback`, by returning to the event
3826+
loop. Setting `copying` prevents any more reads/writes from starting and also
3827+
prevents the stream/future from being closed.
3828+
3829+
Regardless of whether the `{stream,future}.{read,write}` completes
3830+
synchronously or asynchronously, the results passed to core wasm are
3831+
bit-packed into a single `i32` according to the following scheme:
38253832
```python
38263833
BLOCKED = 0xffff_ffff
38273834
COMPLETED = 0x0
38283835
CLOSED = 0x1
38293836
CANCELLED = 0x2
38303837

38313838
def pack_copy_result(task, e, buffer, why):
3832-
if e.stream.closed():
3839+
if e.shared.closed():
38333840
result = CLOSED
38343841
elif why == 'cancelled':
38353842
result = CANCELLED
@@ -3882,10 +3889,10 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
38823889
trap_if(not task.inst.may_leave)
38833890
e = task.inst.table.get(i)
38843891
trap_if(not isinstance(e, EndT))
3885-
trap_if(e.stream.t != stream_or_future_t.t)
3892+
trap_if(e.shared.t != stream_or_future_t.t)
38863893
trap_if(not e.copying)
38873894
if not e.has_pending_event():
3888-
e.stream.cancel()
3895+
e.shared.cancel()
38893896
if not e.has_pending_event():
38903897
if sync:
38913898
await task.wait_on(e.wait_for_pending_event(), sync = True)
@@ -3896,10 +3903,10 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
38963903
return [payload]
38973904
```
38983905
The *first* check for `e.has_pending_event()` catches the case where the copy has
3899-
already racily finished, in which case we must *not* call `stream.cancel()`.
3900-
Calling `stream.cancel()` may, but is not required to, recursively call one of
3901-
the `on_*` callbacks (passed by `canon_{stream,future}_{read,write}` above)
3902-
which will set a pending event that is caught by the *second* check for
3906+
already racily finished, in which case we must *not* call `cancel()`. Calling
3907+
`cancel()` may, but is not required to, recursively call one of the `on_*`
3908+
callbacks (passed by `canon_{stream,future}_{read,write}` above) which will set
3909+
a pending event that is caught by the *second* check for
39033910
`e.has_pending_event()`.
39043911

39053912
If the copy hasn't been cancelled, the synchronous case uses `Task.wait_on` to
@@ -3951,7 +3958,7 @@ async def close(EndT, stream_or_future_t, task, hi):
39513958
trap_if(not task.inst.may_leave)
39523959
e = task.inst.table.remove(hi)
39533960
trap_if(not isinstance(e, EndT))
3954-
trap_if(e.stream.t != stream_or_future_t.t)
3961+
trap_if(e.shared.t != stream_or_future_t.t)
39553962
e.drop()
39563963
return []
39573964
```

0 commit comments

Comments
 (0)